fix: arbitrate brainlayer writes through queue drain#282
Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
📝 WalkthroughWalkthroughThis PR adds a durable JSONL write-arbitration queue and a single-writer drain daemon that applies queued events into SQLite under exclusive locks; arbitration is enabled via BRAINLAYER_ARBITRATED and integrated into enrichment, MCP, watcher, and hook paths, plus launchd and CLI support. ChangesWrite Arbitration Queue and Drain Integration
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 6d13e57008
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| "tags": tags, | ||
| "importance": importance, | ||
| "supersedes": supersedes, | ||
| "metadata": {key: value for key, value in metadata.items() if value is not None}, |
There was a problem hiding this comment.
Preserve entity links when draining stores
When BRAINLAYER_ARBITRATED=1, brain_store(entity_id=...) now goes through this queue path, but entity_id is only serialized into the chunk metadata and _apply_store never writes kg_entity_chunks or validates that the entity exists. That regresses the documented per-person memory contract: brain_get_person/entity-scoped searches read kg_entity_chunks, so memories stored with an entity in arbitrated mode become invisible to those flows even though the tool reports success.
Useful? React with 👍 / 👎.
| for vector_table in ("chunk_vectors_dense", "chunk_vectors_binary"): | ||
| if conn.execute("SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,)).fetchone(): | ||
| conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (supersedes,)) |
There was a problem hiding this comment.
Delete superseded vectors from the actual vector table
For queued brain_store(..., supersedes=...) writes, this loop never removes the old dense vector because the schema uses chunk_vectors, not chunk_vectors_dense (the existing VectorStore._delete_chunk_vector deletes chunk_vectors and chunk_vectors_binary). The old superseded chunk can therefore remain in the KNN table and consume vector candidates before the superseded_by IS NULL filter removes it, degrading retrieval after supersedes in arbitrated mode.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
🟢 Low
brainlayer/src/brainlayer/vector_store.py
Line 925 in 6d13e57
When chunks_fts_trigram is first created on an existing database (lines 280–285), existing chunks are never indexed because the backfill at lines 925–931 only populates chunks_fts. Trigram searches will silently return empty results for all pre-existing data unless the user manually sets BRAINLAYER_REPAIR=1. Consider adding chunks_fts_trigram to the same backfill block, or creating it with the same IF NOT EXISTS pattern that prevents empty tables from going unnoticed.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file src/brainlayer/vector_store.py around line 925:
When `chunks_fts_trigram` is first created on an existing database (lines 280–285), existing chunks are never indexed because the backfill at lines 925–931 only populates `chunks_fts`. Trigram searches will silently return empty results for all pre-existing data unless the user manually sets `BRAINLAYER_REPAIR=1`. Consider adding `chunks_fts_trigram` to the same backfill block, or creating it with the same `IF NOT EXISTS` pattern that prevents empty tables from going unnoticed.
Evidence trail:
src/brainlayer/vector_store.py lines 280-285: `chunks_fts_trigram` created with `IF NOT EXISTS`.
src/brainlayer/vector_store.py lines 925-931: backfill only populates `chunks_fts`, no backfill for `chunks_fts_trigram`.
src/brainlayer/vector_store.py lines 306-318: triggers for `chunks_fts_trigram_insert` only fire on new inserts.
src/brainlayer/vector_store.py lines 940-950: `repair_fts` method rebuilds `chunks_fts_trigram` but is only called when `BRAINLAYER_REPAIR=1` (line 367).
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
There was a problem hiding this comment.
Actionable comments posted: 10
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/brainlayer/cli/__init__.py`:
- Around line 1057-1064: The repair_fts CLI command currently instantiates
VectorStore with DEFAULT_DB_PATH which bypasses runtime DB path resolution;
change it to call get_db_path() from paths (import get_db_path) and pass that
result into VectorStore so the command uses the configured runtime database path
instead of the hardcoded DEFAULT_DB_PATH; update the import and the
VectorStore(DEFAULT_DB_PATH) usage in repair_fts accordingly.
- Around line 2097-2140: The code is hardcoding DEFAULT_DB_PATH for
migration/drain; change path resolution to use get_db_path() from ..paths
everywhere: replace queue_path = DEFAULT_DB_PATH.parent / "pending-stores.jsonl"
with queue_path = get_db_path().parent / "pending-stores.jsonl", and pass
db_path=get_db_path() into drain_once (and any other calls that currently use
DEFAULT_DB_PATH); ensure get_db_path is imported from ..paths at the top of the
file and leave enqueue_store and get_queue_dir usage unchanged.
- Around line 2109-2113: Wrap the json.loads(line) call in a try/except that
catches JSONDecodeError (json.decoder.JSONDecodeError) so a single malformed
JSONL row does not abort the whole flush migration; on decode error increment
the existing skipped counter (and optionally log/debug the bad line) and
continue to the next line, then proceed with the existing content =
item.get("content") / if not content handling as before.
In `@src/brainlayer/drain.py`:
- Around line 21-22: Replace the duplicated DB path resolution in
_default_db_path with a call to the shared helper: import and return
get_db_path() from paths.py instead of reconstructing Path(os.environ.get(...)).
Update the top of the file to import get_db_path and change the _default_db_path
function to simply return get_db_path() so all DB path logic is centralized in
get_db_path().
- Around line 140-168: In _apply_hook, the timestamp parsing uses
float(event.get("timestamp") or time.time()) which can raise ValueError for
non-numeric strings; instead, first extract ts_raw = event.get("timestamp"), try
to convert ts = float(ts_raw) in a try/except (catch ValueError/TypeError) and
fall back to ts = time.time() on error or None, optionally logging a warning
about the invalid timestamp, then use datetime.fromtimestamp(ts,
timezone.utc).isoformat() when building the chunk payload.
- Around line 237-282: The drain_once function currently aborts on any exception
(including sqlite3.OperationalError(SQLITE_BUSY)) during BEGIN IMMEDIATE and
returns 0; change it to detect SQLITE_BUSY (sqlite3.OperationalError with
"database is locked" or sqlite3.busy_error) and retry the transaction with
exponential backoff (e.g., backoff loop with sleep increasing and a max
attempts) when that specific error occurs while still rolling back on other
exceptions; keep using a fresh sqlite3.connect per drain_once invocation,
perform BEGIN IMMEDIATE and the read/apply loop (_read_events, _apply_event)
inside the retryable block, log retry attempts and final failures via
_log(log_path, ...), and only return 0 after exhausting retries or on
non-SQLITE_BUSY exceptions.
In `@src/brainlayer/enrichment_controller.py`:
- Around line 667-668: The code currently skips calling
_ensure_enrichment_columns(store) when _arbitrated_writes_enabled() is true,
which allows _is_duplicate_content() to read chunks.content_hash on DBs that
lack the column and silently disable deduping; fix this by guaranteeing the
content_hash schema is bootstrapped before any dedup reads: either call
_ensure_enrichment_columns(store) unconditionally at process startup or move the
call into the single-writer path (the same place where arbitration is
initialized) so that _ensure_enrichment_columns(store) runs before any
_is_duplicate_content() calls; make the change around the sites referencing
_arbitrated_writes_enabled() (the checks that currently skip
_ensure_enrichment_columns) and add an explicit comment/flag describing the
DB/schema-risk and that this is intentionally run in the single-writer/migration
path.
In `@src/brainlayer/mcp/store_handler.py`:
- Around line 503-526: The arbitrated fast path currently calls
enqueue_store(...) directly which can fail and bypass the durable fallback in
_queue_store; replace the direct call with a call to _queue_store(...) using the
same arguments (include supersedes, source="mcp", and all other kwargs like
content, memory_type, project, tags, importance, confidence_score, outcome,
reversibility, files_changed, entity_id, status, severity, file_path,
function_name, line_number) so the arbitrated branch reuses the durable
queuing/lock behavior; keep the existing return path that constructs structured
= {"chunk_id": "queued", "related": []} and returns ([TextContent(type="text",
text=format_store_result("queued", queued=True))], structured) after
_queue_store completes.
In `@src/brainlayer/vector_store.py`:
- Around line 939-950: The repair_fts function performs large delete/insert
writes and must be made contention-safe: wrap the delete/insert in an explicit
transaction (e.g., BEGIN IMMEDIATE / COMMIT) using the connection on the current
worker (ensure each worker uses its own sqlite3 connection), run PRAGMA
wal_checkpoint(FULL) before starting and after finishing the bulk writes, and
add a SQLITE_BUSY retry/backoff loop around the entire transaction (retry on
sqlite3.OperationalError with "database is locked" or SQLITE_BUSY, with a short
exponential backoff and a bounded number of attempts) so the cursor.execute
calls inside repair_fts (the DELETE, INSERT, and COUNT statements) are retried
atomically until success or max attempts. Ensure you call conn.commit() on
success and rollback on failure.
In `@tests/test_arbitration.py`:
- Around line 116-118: Add an assertion that the sanitized filename matches the
expected safe format: use queued_path.name and assert it fully matches a
whitelist regex (e.g., import re and assert re.fullmatch(r'^[A-Za-z0-9_.-]+$',
queued_path.name)) to ensure unsafe characters are replaced with allowed
characters; optionally also assert an exact sanitized string if you know the
replacement rules.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 0c32d0d9-9e8b-42c4-97d4-df734afa2085
📒 Files selected for processing (17)
scripts/drain_daemon.pyscripts/launchd/com.brainlayer.drain.plistscripts/launchd/com.brainlayer.enrichment.plistscripts/launchd/com.brainlayer.repair-fts.plistscripts/launchd/com.brainlayer.watch.plistscripts/launchd/install.shsrc/brainlayer/cli/__init__.pysrc/brainlayer/drain.pysrc/brainlayer/enrichment_controller.pysrc/brainlayer/hooks/indexer.pysrc/brainlayer/mcp/store_handler.pysrc/brainlayer/queue_io.pysrc/brainlayer/vector_store.pysrc/brainlayer/watcher_bridge.pytests/test_arbitration.pytests/test_search_trigram_fts.pytests/test_write_queue.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (5)
- GitHub Check: Cursor Bugbot
- GitHub Check: Macroscope - Correctness Check
- GitHub Check: test (3.11)
- GitHub Check: test (3.13)
- GitHub Check: test (3.12)
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior
Enforce one-write-at-a-time concurrency constraint; reads are safe but brain_digest is write-heavy and must not run in parallel with other MCP work
Run pytest before claiming behavior changed safely; current test suite has 929 tests
**/*.py: Usepaths.py:get_db_path()for all database path resolution; all scripts and CLI must use this function rather than hardcoding paths
When performing bulk database operations: stop enrichment workers first, checkpoint WAL before and after, drop FTS triggers before bulk deletes, batch deletes in 5-10K chunks, and checkpoint every 3 batches
Files:
tests/test_arbitration.pyscripts/drain_daemon.pytests/test_write_queue.pysrc/brainlayer/watcher_bridge.pysrc/brainlayer/hooks/indexer.pysrc/brainlayer/mcp/store_handler.pysrc/brainlayer/queue_io.pytests/test_search_trigram_fts.pysrc/brainlayer/enrichment_controller.pysrc/brainlayer/drain.pysrc/brainlayer/cli/__init__.pysrc/brainlayer/vector_store.py
src/brainlayer/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/brainlayer/**/*.py: Use retry logic onSQLITE_BUSYerrors; each worker must use its own database connection to handle concurrency safely
Classification must preserveai_code,stack_trace, anduser_messageverbatim; skipnoiseentries entirely and summarizebuild_loganddir_listingentries (structure only)
Use AST-aware chunking via tree-sitter; never split stack traces; mask large tool output
For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback viaenrichment_controller.py, and Ollama as offline last-resort; allow override viaBRAINLAYER_ENRICH_BACKENDenv var
Configure enrichment rate viaBRAINLAYER_ENRICH_RATEenvironment variable (default 0.2 = 12 RPM)
Implement chunk lifecycle columns:superseded_by,aggregated_into,archived_aton chunks table; exclude lifecycle-managed chunks from default search; allowinclude_archived=Trueto show history
Implementbrain_supersedewith safety gate for personal data (journals, notes, health/finance); use soft-delete forbrain_archivewith timestamp
Addsupersedesparameter tobrain_storefor atomic store-and-replace operations
Run linting and formatting with:ruff check src/ && ruff format src/
Run tests withpytest
UsePRAGMA wal_checkpoint(FULL)before and after bulk database operations to prevent WAL bloat
Files:
src/brainlayer/watcher_bridge.pysrc/brainlayer/hooks/indexer.pysrc/brainlayer/mcp/store_handler.pysrc/brainlayer/queue_io.pysrc/brainlayer/enrichment_controller.pysrc/brainlayer/drain.pysrc/brainlayer/cli/__init__.pysrc/brainlayer/vector_store.py
🪛 OpenGrep (1.20.0)
src/brainlayer/drain.py
[ERROR] 41-41: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.
(coderabbit.sql-injection.python-fstring-execute)
[ERROR] 106-106: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.
(coderabbit.sql-injection.python-fstring-execute)
[ERROR] 210-210: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.
(coderabbit.sql-injection.python-fstring-execute)
🔇 Additional comments (17)
tests/test_arbitration.py (1)
59-90: LGTM!tests/test_search_trigram_fts.py (1)
64-85: LGTM!tests/test_write_queue.py (1)
29-61: LGTM!Also applies to: 213-246
src/brainlayer/enrichment_controller.py (1)
171-197: LGTM!Also applies to: 676-679, 720-723, 855-860, 869-876, 928-933, 964-971
src/brainlayer/mcp/store_handler.py (1)
392-399: LGTM!src/brainlayer/watcher_bridge.py (1)
16-16: LGTM!Also applies to: 26-26, 207-208, 215-317
src/brainlayer/hooks/indexer.py (1)
21-22: LGTM!Also applies to: 31-31, 208-215
src/brainlayer/queue_io.py (4)
13-22: LGTM!
25-40: LGTM!
43-103: LGTM!
106-147: LGTM!src/brainlayer/drain.py (6)
33-41: LGTM!
44-54: LGTM!
57-107: LGTM!
171-223: LGTM!
226-234: LGTM!
285-302: LGTM!
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 41917c2975
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| source="mcp", | ||
| ) | ||
| structured = {"chunk_id": "queued", "related": []} | ||
| return ([TextContent(type="text", text=format_store_result("queued", queued=True))], structured) |
There was a problem hiding this comment.
Run the post-store pipeline for queued memories
When BRAINLAYER_ARBITRATED=1, this early return reports success before the non-arbitrated path starts the background worker that calls embed_pending_chunks and enrich_single. The new drain path only inserts the row and sets enriched_at immediately, so queued brain_store memories never receive chunk_vectors and are also excluded from the normal enrichment candidate query (enriched_at IS NULL), leaving them permanently missing from semantic recall and enrichment-derived tags/entities unless an operator runs a separate manual backfill.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 139ff034af
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if conn.execute( | ||
| "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,) | ||
| ).fetchone(): | ||
| conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (supersedes,)) |
There was a problem hiding this comment.
Load sqlite-vec before deleting superseded vectors
When an arbitrated brain_store(..., supersedes=...) is drained against a normal BrainLayer DB, chunk_vectors is the vec0 virtual table created by VectorStore, but this drain path opens the DB with stdlib sqlite3 and never loads the sqlite-vec extension. This DELETE FROM chunk_vectors raises OperationalError: no such module: vec0, causing drain_once to roll back, return 0, and leave the same queue file to poison subsequent drain batches.
Useful? React with 👍 / 👎.
| ) | ||
|
|
||
|
|
||
| def _apply_hook(conn: sqlite3.Connection, event: dict[str, Any]) -> None: |
There was a problem hiding this comment.
🟢 Low brainlayer/drain.py:156
When event["timestamp"] is a string that parses as float("inf") or an extremely large number (e.g., 1e50), the float() conversion on line 170 succeeds, but datetime.fromtimestamp() on line 186 raises OSError or OverflowError — neither of which are caught. This crashes the entire batch instead of falling back to the current time.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file src/brainlayer/drain.py around line 156:
When `event["timestamp"]` is a string that parses as `float("inf")` or an extremely large number (e.g., `1e50`), the `float()` conversion on line 170 succeeds, but `datetime.fromtimestamp()` on line 186 raises `OSError` or `OverflowError` — neither of which are caught. This crashes the entire batch instead of falling back to the current time.
Evidence trail:
src/brainlayer/drain.py lines 156-190: `_apply_hook` function with try/except catching only TypeError/ValueError at float() conversion (lines ~170-173), then calling `datetime.fromtimestamp(timestamp, timezone.utc)` unprotected (line ~186). src/brainlayer/drain.py lines 296-327: outer drain loop where uncaught exceptions cause `conn.rollback()` and batch failure.
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 922aafd18d
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| if os.environ.get("BRAINLAYER_ARBITRATED") == "1": | ||
| _queue_store( | ||
| { | ||
| "content": content, | ||
| "memory_type": memory_type, | ||
| "project": _normalize_project_name(project), | ||
| "tags": tags, | ||
| "importance": importance, | ||
| "confidence_score": confidence_score, | ||
| "outcome": outcome, | ||
| "reversibility": reversibility, | ||
| "files_changed": files_changed, | ||
| "entity_id": entity_id, | ||
| "status": status, | ||
| "severity": severity, | ||
| "file_path": file_path, | ||
| "function_name": function_name, | ||
| "line_number": line_number, | ||
| "supersedes": supersedes, | ||
| } | ||
| ) | ||
| structured = {"chunk_id": "queued", "related": []} | ||
| return ([TextContent(type="text", text=format_store_result("queued", queued=True))], structured) |
There was a problem hiding this comment.
Validate content before enqueueing stores
When BRAINLAYER_ARBITRATED=1, this branch bypasses the store_memory validation that rejects blank content and system-prompt-looking content. The drain path only skips blank content after the tool has already returned a queued success, and it does not run the system-prompt filter at all, so arbitrated brain_store can either report a successful write that is silently dropped or persist content the normal path intentionally blocks.
Useful? React with 👍 / 👎.
| structured = {"chunk_id": "queued", "related": []} | ||
| return ([TextContent(type="text", text=format_store_result("queued", queued=True))], structured) |
There was a problem hiding this comment.
Clear cached search results for queued stores
In arbitrated mode the MCP process returns after enqueueing without invalidating the in-process hybrid search cache, unlike the normal store_memory path which clears it after writes. If a user searches for a query, then stores a matching memory and the drain writes it, repeating the same brain_search in the same MCP process can keep returning the pre-store cached results for up to the cache TTL, making the newly stored memory appear missing.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 9
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/brainlayer/mcp/store_handler.py (1)
392-422:⚠️ Potential issue | 🟠 Major | ⚡ Quick winSerialize the legacy fallback queue writes.
If
enqueue_store(...)fails, this falls back topending-stores.jsonl, but the append/trim/rewrite path is still unlocked. Under concurrent MCP callers, one fallback writer can rename a trimmed file over another writer’s append and drop queued stores. The fallback needs a file lock around the full write+trim sequence, or it is not durable under the same failure that triggered the fallback.As per coding guidelines, "Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/brainlayer/mcp/store_handler.py` around lines 392 - 422, The fallback writing to pending-stores.jsonl must be serialized: wrap the entire append + read/trim/rewrite sequence with a file lock to prevent concurrent writers from losing entries; after computing path = _get_pending_store_path() and ensuring path.parent exists, obtain an exclusive lock (e.g., on path or path.with_suffix(".lock")) before opening for append, writing json.dumps(item) + "\n", reading path.read_text(), trimming to _QUEUE_MAX_SIZE, writing to a temporary file and renaming, and always release the lock in a finally block; update the code around enqueue_store, path, and _QUEUE_MAX_SIZE and preserve existing logging (logger.debug/warning) for errors.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/brainlayer/cli/__init__.py`:
- Around line 2120-2133: The current chunk_id generation uses a truncated SHA256
of stable_payload built from only content, memory_type, project, tags, and
importance, causing distinct legacy rows to collide; update the stable_payload
used for the hash (the object built just before chunk_id) to include all unique
legacy identifying fields from item (e.g., severity, file_path, line_number,
entity_id and any other provenance fields present on item) so the hash covers
those attributes before computing chunk_id for the chunk_id construction code
that references stable_payload and item.
In `@src/brainlayer/drain.py`:
- Around line 416-418: Currently the code calls _embed_store_chunks(conn,
store_chunk_ids, embed_fn) while the transaction is still open, holding the
SQLite write lock; change the order so the transaction is committed before
invoking the embedder: move or duplicate conn.execute("COMMIT") to occur before
calling _embed_store_chunks (i.e., call conn.execute("COMMIT") when should_embed
is true and only then call _embed_store_chunks), ensuring the BEGIN
IMMEDIATE/transaction is closed before any slow/remote embedding work; update
any surrounding logic that assumes conn is usable after commit and add a brief
comment flagging this as a risky DB/concurrency change.
- Around line 107-145: The code currently does float(event["importance"]) during
chunk insertion which will raise on malformed input and abort the drain; update
the logic around the "importance" field (in the block that builds the dict for
_insert_chunk in drain.py) to coerce defensively: attempt to parse
event.get("importance") inside a try/except catching ValueError/TypeError and on
failure set importance to None (or simply omit the key) so a bad value won't
throw and roll back the transaction; ensure this change is applied where
_insert_chunk is called and does not re-raise so only the bad event is skipped
or inserted with no importance instead of poisoning the queue.
In `@src/brainlayer/enrichment_controller.py`:
- Around line 171-181: The queued enrichment helper _enqueue_enrichment_write
currently drops the chunk's storage context so the drain cannot pick the correct
SQLite DB; modify the call to enqueue_enrichment_update to pass through the
store/db_path from the chunk (e.g., chunk.get("store") or chunk.get("db_path"))
alongside chunk_id, enrichment, content_hash and entities so the queued intent
includes the target DB; update any callers or the enqueue_enrichment_update
signature if needed to accept and forward this store/db_path field to the drain
to ensure writes go to the originating store.
In `@src/brainlayer/mcp/store_handler.py`:
- Around line 503-525: The queued-item path must preserve the supersedes field
so a fallback replay preserves atomic replace; update the replay path in
_flush_pending_stores to read the stored record's "supersedes" and pass it
through to store_memory (and any intermediate call like brain_store/store_memory
wrappers) when replaying pending-stores.jsonl entries; ensure function
signatures used in replay accept and forward supersedes (e.g., store_memory,
brain_store) so the atomic store-and-replace semantics are retained on failure
paths.
In `@src/brainlayer/queue_io.py`:
- Around line 117-125: The code currently sets the "timestamp" field using
"timestamp or time.time()", which treats 0 (and other falsy but valid values) as
missing; change this to use an explicit None check so falsy timestamps like 0
are preserved (e.g., set "timestamp" to timestamp if timestamp is not None else
time.time()); update the dict passed to enqueue_jsonl (the "timestamp" key)
accordingly so enqueue_jsonl receives the correct timestamp value.
- Around line 36-39: The current write uses tmp_path.write_text(...) then
tmp_path.replace(final_path) which can lose data on crash because neither the
file contents nor the containing directory are fsynced; modify the sequence
around tmp_path and final_path so you open the temp file (e.g.
tmp_path.open("wb")), write bytes, call file.flush() and
os.fsync(file.fileno()), then atomically move the temp to final (os.replace or
tmp_path.replace), and afterward open the parent directory
(os.open(final_path.parent, os.O_DIRECTORY)) and call os.fsync(dir_fd) before
closing it; update the code paths that currently call tmp_path.write_text and
tmp_path.replace to use this safe write+fsync+dir fsync pattern.
In `@tests/test_arbitration.py`:
- Around line 343-348: The test currently creates stale_lock then immediately
unlinks it so drain_once never sees a sentinel; remove the premature deletion
and let drain_once handle the stale file: create stale_lock = queue_dir /
".drain.lock", write the sentinel, do not call stale_lock.unlink() before
invoking drain_once(db_path=db_path, queue_dir=queue_dir), then assert
drain_once(...) == 1 and assert not stale_lock.exists() afterwards so the test
verifies that drain_once detects and removes a stale .drain.lock; reference the
stale_lock variable and the drain_once(...) call to locate the change.
- Around line 82-87: The helper _connect_apsw currently returns a raw
apsw.Connection which used in tests as "with _connect_apsw(...) as conn:" only
manages transactions and does not close the connection, leaking SQLite handles;
change _connect_apsw into a proper closing context manager (e.g., use
contextlib.contextmanager or return an object with __enter__/__exit__) that
creates the apsw.Connection, enables/loads extensions as before, yields the
connection, and always calls conn.close() in the exit/finally path; update any
helper callers like _create_vec_db (and all tests already using "with
_connect_apsw(...) as conn:") to rely on the new context manager so connections
are closed after each with-block.
---
Outside diff comments:
In `@src/brainlayer/mcp/store_handler.py`:
- Around line 392-422: The fallback writing to pending-stores.jsonl must be
serialized: wrap the entire append + read/trim/rewrite sequence with a file lock
to prevent concurrent writers from losing entries; after computing path =
_get_pending_store_path() and ensuring path.parent exists, obtain an exclusive
lock (e.g., on path or path.with_suffix(".lock")) before opening for append,
writing json.dumps(item) + "\n", reading path.read_text(), trimming to
_QUEUE_MAX_SIZE, writing to a temporary file and renaming, and always release
the lock in a finally block; update the code around enqueue_store, path, and
_QUEUE_MAX_SIZE and preserve existing logging (logger.debug/warning) for errors.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: 03fa3947-adb4-44a8-accd-dad769e65d7d
📒 Files selected for processing (9)
README.mddocs/arbitration.mdsrc/brainlayer/cli/__init__.pysrc/brainlayer/drain.pysrc/brainlayer/enrichment_controller.pysrc/brainlayer/mcp/store_handler.pysrc/brainlayer/queue_io.pysrc/brainlayer/vector_store.pytests/test_arbitration.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (4)
- GitHub Check: test (3.13)
- GitHub Check: test (3.12)
- GitHub Check: test (3.11)
- GitHub Check: Macroscope - Correctness Check
🧰 Additional context used
📓 Path-based instructions (2)
**/*.py
📄 CodeRabbit inference engine (AGENTS.md)
**/*.py: Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior
Enforce one-write-at-a-time concurrency constraint; reads are safe but brain_digest is write-heavy and must not run in parallel with other MCP work
Run pytest before claiming behavior changed safely; current test suite has 929 tests
**/*.py: Usepaths.py:get_db_path()for all database path resolution; all scripts and CLI must use this function rather than hardcoding paths
When performing bulk database operations: stop enrichment workers first, checkpoint WAL before and after, drop FTS triggers before bulk deletes, batch deletes in 5-10K chunks, and checkpoint every 3 batches
Files:
src/brainlayer/vector_store.pysrc/brainlayer/mcp/store_handler.pysrc/brainlayer/enrichment_controller.pytests/test_arbitration.pysrc/brainlayer/queue_io.pysrc/brainlayer/cli/__init__.pysrc/brainlayer/drain.py
src/brainlayer/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/brainlayer/**/*.py: Use retry logic onSQLITE_BUSYerrors; each worker must use its own database connection to handle concurrency safely
Classification must preserveai_code,stack_trace, anduser_messageverbatim; skipnoiseentries entirely and summarizebuild_loganddir_listingentries (structure only)
Use AST-aware chunking via tree-sitter; never split stack traces; mask large tool output
For enrichment backend selection: use Groq as primary backend (cloud, configured in launchd plist), Gemini as fallback viaenrichment_controller.py, and Ollama as offline last-resort; allow override viaBRAINLAYER_ENRICH_BACKENDenv var
Configure enrichment rate viaBRAINLAYER_ENRICH_RATEenvironment variable (default 0.2 = 12 RPM)
Implement chunk lifecycle columns:superseded_by,aggregated_into,archived_aton chunks table; exclude lifecycle-managed chunks from default search; allowinclude_archived=Trueto show history
Implementbrain_supersedewith safety gate for personal data (journals, notes, health/finance); use soft-delete forbrain_archivewith timestamp
Addsupersedesparameter tobrain_storefor atomic store-and-replace operations
Run linting and formatting with:ruff check src/ && ruff format src/
Run tests withpytest
UsePRAGMA wal_checkpoint(FULL)before and after bulk database operations to prevent WAL bloat
Files:
src/brainlayer/vector_store.pysrc/brainlayer/mcp/store_handler.pysrc/brainlayer/enrichment_controller.pysrc/brainlayer/queue_io.pysrc/brainlayer/cli/__init__.pysrc/brainlayer/drain.py
🪛 OpenGrep (1.20.0)
src/brainlayer/drain.py
[ERROR] 72-72: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.
(coderabbit.sql-injection.python-fstring-execute)
[ERROR] 144-144: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.
(coderabbit.sql-injection.python-fstring-execute)
[ERROR] 271-271: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.
(coderabbit.sql-injection.python-fstring-execute)
🔇 Additional comments (1)
tests/test_arbitration.py (1)
220-234: ⚡ Quick winThe polling loop after
drain_onceis necessary due to WAL visibility timing, not async embedding writes.
drain_oncecommits embeddings synchronously within its transaction (line 417–418 ofdrain.py) before returning. The_embed_store_chunks()call executes INSERT statements directly on the connection beforeCOMMIT, so embeddings are part of the committed WAL transaction, not fire-and-forget or asynchronous.The 2-second polling loop is likely defensive code accounting for WAL checkpoint visibility behavior when mixing
sqlite3.connect()(used in test setup) withAPSWconnections (used for queries). Similar polling appears in other tests liketest_drain_daemon_serializes_three_concurrent_producers. If the polling is consistently unnecessary in CI, consider removing it; if it occasionally times out on slow machines, document the WAL visibility behavior instead.
| stable_payload = { | ||
| "content": content, | ||
| "memory_type": item.get("memory_type", "note"), | ||
| "project": item.get("project"), | ||
| "tags": item.get("tags"), | ||
| "importance": item.get("importance"), | ||
| } | ||
| chunk_id = ( | ||
| item.get("chunk_id") | ||
| or "pending-" | ||
| + hashlib.sha256( | ||
| json.dumps(stable_payload, sort_keys=True, separators=(",", ":")).encode("utf-8") | ||
| ).hexdigest()[:16] | ||
| ) |
There was a problem hiding this comment.
Legacy migration ID hash can collapse distinct records.
Line 2120–Line 2133 builds chunk_id from only five fields. Distinct legacy rows (e.g., different severity, file_path, line_number, entity_id) can map to the same ID and be merged/dropped downstream.
Suggested fix
- stable_payload = {
- "content": content,
- "memory_type": item.get("memory_type", "note"),
- "project": item.get("project"),
- "tags": item.get("tags"),
- "importance": item.get("importance"),
- }
+ stable_payload = {
+ "content": content,
+ "memory_type": item.get("memory_type", "note"),
+ "project": item.get("project"),
+ "tags": item.get("tags"),
+ "importance": item.get("importance"),
+ "confidence_score": item.get("confidence_score"),
+ "outcome": item.get("outcome"),
+ "reversibility": item.get("reversibility"),
+ "files_changed": item.get("files_changed"),
+ "entity_id": item.get("entity_id"),
+ "status": item.get("status"),
+ "severity": item.get("severity"),
+ "file_path": item.get("file_path"),
+ "function_name": item.get("function_name"),
+ "line_number": item.get("line_number"),
+ "supersedes": item.get("supersedes"),
+ }🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/brainlayer/cli/__init__.py` around lines 2120 - 2133, The current
chunk_id generation uses a truncated SHA256 of stable_payload built from only
content, memory_type, project, tags, and importance, causing distinct legacy
rows to collide; update the stable_payload used for the hash (the object built
just before chunk_id) to include all unique legacy identifying fields from item
(e.g., severity, file_path, line_number, entity_id and any other provenance
fields present on item) so the hash covers those attributes before computing
chunk_id for the chunk_id construction code that references stable_payload and
item.
| now = datetime.now(timezone.utc).isoformat() | ||
| metadata = {"memory_type": event.get("memory_type", "note")} | ||
| metadata.update(event.get("metadata") or {}) | ||
| chunk_id = event.get("chunk_id") or f"manual-{uuid.uuid4().hex[:16]}" | ||
| existing = conn.execute("SELECT content FROM chunks WHERE id = ?", (chunk_id,)).fetchone() | ||
| if existing: | ||
| if str(existing[0]).strip() == content: | ||
| return ApplyResult(chunk_id=chunk_id) | ||
| return ApplyResult(collision_chunk_id=chunk_id) | ||
| tags = event.get("tags") | ||
| _insert_chunk( | ||
| conn, | ||
| { | ||
| "id": chunk_id, | ||
| "content": content, | ||
| "metadata": json.dumps(metadata), | ||
| "source_file": "brainlayer-queue", | ||
| "project": event.get("project"), | ||
| "content_type": event.get("memory_type", "note"), | ||
| "value_type": "HIGH", | ||
| "char_count": len(content), | ||
| "source": event.get("source") or "manual", | ||
| "created_at": now, | ||
| "enriched_at": now, | ||
| "summary": content[:200], | ||
| "tags": json.dumps(tags) if tags else None, | ||
| "importance": float(event["importance"]) if event.get("importance") is not None else None, | ||
| }, | ||
| ) | ||
| supersedes = event.get("supersedes") or metadata.get("supersedes") | ||
| cols = _columns(conn, "chunks") | ||
| if supersedes and "superseded_by" in cols: | ||
| conn.execute("UPDATE chunks SET superseded_by = ? WHERE id = ?", (chunk_id, supersedes)) | ||
| for vector_table in ("chunk_vectors", "chunk_vectors_binary"): | ||
| if conn.execute( | ||
| "SELECT name FROM sqlite_master WHERE type = 'table' AND name = ?", (vector_table,) | ||
| ).fetchone(): | ||
| conn.execute(f"DELETE FROM {vector_table} WHERE chunk_id = ?", (supersedes,)) | ||
| entity_id = event.get("entity_id") or metadata.get("entity_id") |
There was a problem hiding this comment.
Don't let a bad importance value poison the queue file.
float(event["importance"]) will raise on malformed producer input, roll back the transaction, and leave the same .jsonl on disk to fail again on every drain pass. This path should coerce defensively and either drop the field or skip just that event.
Suggested fix
metadata = {"memory_type": event.get("memory_type", "note")}
metadata.update(event.get("metadata") or {})
chunk_id = event.get("chunk_id") or f"manual-{uuid.uuid4().hex[:16]}"
+ raw_importance = event.get("importance")
+ importance = None
+ if raw_importance is not None:
+ try:
+ importance = float(raw_importance)
+ except (TypeError, ValueError):
+ logger.warning("Ignoring invalid importance=%r for chunk_id=%s", raw_importance, chunk_id)
existing = conn.execute("SELECT content FROM chunks WHERE id = ?", (chunk_id,)).fetchone()
@@
- "importance": float(event["importance"]) if event.get("importance") is not None else None,
+ "importance": importance,🧰 Tools
🪛 OpenGrep (1.20.0)
[ERROR] 144-144: SQL query built via f-string passed to execute()/executemany(). Use parameterized queries with placeholders instead.
(coderabbit.sql-injection.python-fstring-execute)
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/brainlayer/drain.py` around lines 107 - 145, The code currently does
float(event["importance"]) during chunk insertion which will raise on malformed
input and abort the drain; update the logic around the "importance" field (in
the block that builds the dict for _insert_chunk in drain.py) to coerce
defensively: attempt to parse event.get("importance") inside a try/except
catching ValueError/TypeError and on failure set importance to None (or simply
omit the key) so a bad value won't throw and roll back the transaction; ensure
this change is applied where _insert_chunk is called and does not re-raise so
only the bad event is skipped or inserted with no importance instead of
poisoning the queue.
| if should_embed: | ||
| _embed_store_chunks(conn, store_chunk_ids, embed_fn) | ||
| conn.execute("COMMIT") |
There was a problem hiding this comment.
Commit the batch before calling the embedder.
_embed_store_chunks() runs while BEGIN IMMEDIATE is still open, so any slow or remote embedding call holds the SQLite write lock for the full embedding latency. Under arbitration, that turns embed slowness into write contention for every producer.
Suggested fix
- if should_embed:
- _embed_store_chunks(conn, store_chunk_ids, embed_fn)
conn.execute("COMMIT")
+ if should_embed and store_chunk_ids:
+ embed_conn = _open_connection(db_path)
+ try:
+ _embed_store_chunks(embed_conn, store_chunk_ids, embed_fn)
+ finally:
+ embed_conn.close()As per coding guidelines, "Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior."
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/brainlayer/drain.py` around lines 416 - 418, Currently the code calls
_embed_store_chunks(conn, store_chunk_ids, embed_fn) while the transaction is
still open, holding the SQLite write lock; change the order so the transaction
is committed before invoking the embedder: move or duplicate
conn.execute("COMMIT") to occur before calling _embed_store_chunks (i.e., call
conn.execute("COMMIT") when should_embed is true and only then call
_embed_store_chunks), ensuring the BEGIN IMMEDIATE/transaction is closed before
any slow/remote embedding work; update any surrounding logic that assumes conn
is usable after commit and add a brief comment flagging this as a risky
DB/concurrency change.
| def _enqueue_enrichment_write(chunk: dict[str, Any], enrichment: dict[str, Any]) -> None: | ||
| from .queue_io import enqueue_enrichment_update | ||
|
|
||
| content = chunk.get("content", "") | ||
| try: | ||
| enqueue_enrichment_update( | ||
| chunk_id=chunk["id"], | ||
| enrichment=enrichment, | ||
| content_hash=_content_hash(content) if content else None, | ||
| entities=enrichment.get("entities", []), | ||
| ) |
There was a problem hiding this comment.
Preserve the target store in the queued enrichment intent.
This helper drops the store/db_path context that the direct write path had. In arbitrated mode, enrich_single, enrich_realtime, and enrich_batch can still read from an arbitrary store, but the queued payload here only contains chunk_id and enrichment fields, so the drain has no way to know which SQLite DB should be updated. That can silently route writes to the default DB instead of the store that produced the chunk.
As per coding guidelines, "Flag risky DB or concurrency changes explicitly and do not hand-wave lock behavior".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/brainlayer/enrichment_controller.py` around lines 171 - 181, The queued
enrichment helper _enqueue_enrichment_write currently drops the chunk's storage
context so the drain cannot pick the correct SQLite DB; modify the call to
enqueue_enrichment_update to pass through the store/db_path from the chunk
(e.g., chunk.get("store") or chunk.get("db_path")) alongside chunk_id,
enrichment, content_hash and entities so the queued intent includes the target
DB; update any callers or the enqueue_enrichment_update signature if needed to
accept and forward this store/db_path field to the drain to ensure writes go to
the originating store.
| if os.environ.get("BRAINLAYER_ARBITRATED") == "1": | ||
| _queue_store( | ||
| { | ||
| "content": content, | ||
| "memory_type": memory_type, | ||
| "project": _normalize_project_name(project), | ||
| "tags": tags, | ||
| "importance": importance, | ||
| "confidence_score": confidence_score, | ||
| "outcome": outcome, | ||
| "reversibility": reversibility, | ||
| "files_changed": files_changed, | ||
| "entity_id": entity_id, | ||
| "status": status, | ||
| "severity": severity, | ||
| "file_path": file_path, | ||
| "function_name": function_name, | ||
| "line_number": line_number, | ||
| "supersedes": supersedes, | ||
| } | ||
| ) | ||
| structured = {"chunk_id": "queued", "related": []} | ||
| return ([TextContent(type="text", text=format_store_result("queued", queued=True))], structured) |
There was a problem hiding this comment.
Keep supersedes intact when the arbitrated path falls back.
This branch now queues supersedes, but _queue_store(...) can still fall back to pending-stores.jsonl, and _flush_pending_stores() replays those items via store_memory(...) without forwarding supersedes. That means the new arbitrated fast path loses atomic store-and-replace semantics exactly on the failure path it is supposed to make durable.
As per coding guidelines, "Add supersedes parameter to brain_store for atomic store-and-replace operations".
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/brainlayer/mcp/store_handler.py` around lines 503 - 525, The queued-item
path must preserve the supersedes field so a fallback replay preserves atomic
replace; update the replay path in _flush_pending_stores to read the stored
record's "supersedes" and pass it through to store_memory (and any intermediate
call like brain_store/store_memory wrappers) when replaying pending-stores.jsonl
entries; ensure function signatures used in replay accept and forward supersedes
(e.g., store_memory, brain_store) so the atomic store-and-replace semantics are
retained on failure paths.
| final_path = resolved_dir / f"{safe_source}-{now_ms}-{uuid.uuid4().hex}.jsonl" | ||
| tmp_path = final_path.with_suffix(".tmp") | ||
| tmp_path.write_text(json.dumps(event, ensure_ascii=True) + "\n", encoding="utf-8") | ||
| tmp_path.replace(final_path) |
There was a problem hiding this comment.
replace() alone does not make this queue durable.
A crash after the rename can still lose the just-enqueued event because neither the temp file nor the parent directory is fsynced. Since this queue is now the write buffer for arbitrated writes, that breaks the “durable JSONL” guarantee.
Suggested fix
- tmp_path.write_text(json.dumps(event, ensure_ascii=True) + "\n", encoding="utf-8")
- tmp_path.replace(final_path)
+ payload = json.dumps(event, ensure_ascii=True) + "\n"
+ with tmp_path.open("w", encoding="utf-8") as handle:
+ handle.write(payload)
+ handle.flush()
+ os.fsync(handle.fileno())
+ tmp_path.replace(final_path)
+ dir_fd = os.open(resolved_dir, os.O_RDONLY)
+ try:
+ os.fsync(dir_fd)
+ finally:
+ os.close(dir_fd)📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| final_path = resolved_dir / f"{safe_source}-{now_ms}-{uuid.uuid4().hex}.jsonl" | |
| tmp_path = final_path.with_suffix(".tmp") | |
| tmp_path.write_text(json.dumps(event, ensure_ascii=True) + "\n", encoding="utf-8") | |
| tmp_path.replace(final_path) | |
| final_path = resolved_dir / f"{safe_source}-{now_ms}-{uuid.uuid4().hex}.jsonl" | |
| tmp_path = final_path.with_suffix(".tmp") | |
| payload = json.dumps(event, ensure_ascii=True) + "\n" | |
| with tmp_path.open("w", encoding="utf-8") as handle: | |
| handle.write(payload) | |
| handle.flush() | |
| os.fsync(handle.fileno()) | |
| tmp_path.replace(final_path) | |
| dir_fd = os.open(resolved_dir, os.O_RDONLY) | |
| try: | |
| os.fsync(dir_fd) | |
| finally: | |
| os.close(dir_fd) |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/brainlayer/queue_io.py` around lines 36 - 39, The current write uses
tmp_path.write_text(...) then tmp_path.replace(final_path) which can lose data
on crash because neither the file contents nor the containing directory are
fsynced; modify the sequence around tmp_path and final_path so you open the temp
file (e.g. tmp_path.open("wb")), write bytes, call file.flush() and
os.fsync(file.fileno()), then atomically move the temp to final (os.replace or
tmp_path.replace), and afterward open the parent directory
(os.open(final_path.parent, os.O_DIRECTORY)) and call os.fsync(dir_fd) before
closing it; update the code paths that currently call tmp_path.write_text and
tmp_path.replace to use this safe write+fsync+dir fsync pattern.
| return enqueue_jsonl( | ||
| { | ||
| "kind": "hook_chunk", | ||
| "session_id": session_id, | ||
| "content": content, | ||
| "content_hash": content_hash, | ||
| "project": project, | ||
| "timestamp": timestamp or time.time(), | ||
| }, |
There was a problem hiding this comment.
Preserve explicit 0 timestamps.
timestamp or time.time() treats 0.0 as missing and rewrites the event time. Use an is None check here so valid falsy timestamps survive round-tripping.
Suggested fix
- "timestamp": timestamp or time.time(),
+ "timestamp": time.time() if timestamp is None else timestamp,📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| return enqueue_jsonl( | |
| { | |
| "kind": "hook_chunk", | |
| "session_id": session_id, | |
| "content": content, | |
| "content_hash": content_hash, | |
| "project": project, | |
| "timestamp": timestamp or time.time(), | |
| }, | |
| return enqueue_jsonl( | |
| { | |
| "kind": "hook_chunk", | |
| "session_id": session_id, | |
| "content": content, | |
| "content_hash": content_hash, | |
| "project": project, | |
| "timestamp": time.time() if timestamp is None else timestamp, | |
| }, |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@src/brainlayer/queue_io.py` around lines 117 - 125, The code currently sets
the "timestamp" field using "timestamp or time.time()", which treats 0 (and
other falsy but valid values) as missing; change this to use an explicit None
check so falsy timestamps like 0 are preserved (e.g., set "timestamp" to
timestamp if timestamp is not None else time.time()); update the dict passed to
enqueue_jsonl (the "timestamp" key) accordingly so enqueue_jsonl receives the
correct timestamp value.
| def _connect_apsw(path: Path) -> apsw.Connection: | ||
| conn = apsw.Connection(str(path)) | ||
| conn.enableloadextension(True) | ||
| conn.loadextension(sqlite_vec.loadable_path()) | ||
| conn.enableloadextension(False) | ||
| return conn |
There was a problem hiding this comment.
🧩 Analysis chain
🌐 Web query:
APSW Connection context manager __exit__ close connection or only commit transaction
💡 Result:
In APSW, using a Connection as a context manager (with conn:) does not close the database connection on exit; it manages the pending transaction only: if the with-block exits without an exception, the pending transaction is committed; if an exception occurred, the transaction is rolled back. [1][2] So, exit commits/rolls back but connection.close is not implied by the context-manager exit. You should close the connection yourself (or use contextlib.closing / an equivalent) if you need it closed deterministically. [3][2]
Citations:
- 1: https://lira.no-ip.org:8443/doc//python-apsw/html/connection.html
- 2: https://rogerbinns.github.io/apsw/connection.html
- 3: https://stackoverflow.com/questions/58471206/does-connection-exit-close-itself-in-sqlite3
🏁 Script executed:
# First, verify the _connect_apsw function exists and matches the snippet
head -100 tests/test_arbitration.py | tail -30Repository: EtanHey/brainlayer
Length of output: 970
🏁 Script executed:
# Find all usages of _connect_apsw to verify the with-statement pattern
rg "_connect_apsw" tests/test_arbitration.py -A 1 -B 1 -nRepository: EtanHey/brainlayer
Length of output: 1808
🏁 Script executed:
# Check _create_vec_db to see how it uses _connect_apsw
rg "_create_vec_db" tests/test_arbitration.py -A 15 -B 2Repository: EtanHey/brainlayer
Length of output: 1189
APSW with statement does not close the connection — leaks across every test.
_connect_apsw returns a raw apsw.Connection, and all call sites use it as with _connect_apsw(...) as conn: (lines 147, 195, 226, 254, 279, 297, 315, 350, 374). Per APSW documentation, the connection context manager only manages a transaction (commit on success, rollback on exception); it does not close the connection. Each invocation therefore leaks the underlying SQLite handle and any WAL/SHM mappings until GC eventually runs __del__. In WAL mode this can keep read locks and file descriptors live across tests, mask the very concurrency behavior under test, and pollute later tests (especially the 3-producer load test).
Recommend converting the helper into a proper closing context manager so all existing with ... as conn: usages do the right thing.
🛠️ Proposed fix — wrap as a closing context manager
+from contextlib import contextmanager
@@
-def _connect_apsw(path: Path) -> apsw.Connection:
- conn = apsw.Connection(str(path))
- conn.enableloadextension(True)
- conn.loadextension(sqlite_vec.loadable_path())
- conn.enableloadextension(False)
- return conn
+@contextmanager
+def _connect_apsw(path: Path):
+ conn = apsw.Connection(str(path))
+ try:
+ conn.enableloadextension(True)
+ conn.loadextension(sqlite_vec.loadable_path())
+ conn.enableloadextension(False)
+ yield conn
+ finally:
+ conn.close()_create_vec_db (line 91) will need updating to with _connect_apsw(path) as conn: as well.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/test_arbitration.py` around lines 82 - 87, The helper _connect_apsw
currently returns a raw apsw.Connection which used in tests as "with
_connect_apsw(...) as conn:" only manages transactions and does not close the
connection, leaking SQLite handles; change _connect_apsw into a proper closing
context manager (e.g., use contextlib.contextmanager or return an object with
__enter__/__exit__) that creates the apsw.Connection, enables/loads extensions
as before, yields the connection, and always calls conn.close() in the
exit/finally path; update any helper callers like _create_vec_db (and all tests
already using "with _connect_apsw(...) as conn:") to rely on the new context
manager so connections are closed after each with-block.
| stale_lock = queue_dir / ".drain.lock" | ||
| stale_lock.write_text("stale", encoding="utf-8") | ||
| stale_lock.unlink() | ||
|
|
||
| assert drain_once(db_path=db_path, queue_dir=queue_dir) == 1 | ||
| assert not stale_lock.exists() |
There was a problem hiding this comment.
Test does not exercise the scenario its name describes.
stale_lock is written and then immediately unlink()ed before drain_once runs, so the drain never observes a lock sentinel at all. The subsequent assert not stale_lock.exists() is trivially true (the test itself just deleted it). As written, this is indistinguishable from a plain "drain processes one item" test and provides no signal about lock-file robustness.
If the intent is "drain still works when a previous stale .drain.lock file is present", leave the file in place and let drain handle it. If the intent is "drain does not depend on a file at that path for mutual exclusion" (i.e., uses flock/in-memory locking on a different artifact), the test needs to assert that drain succeeds even while another holder believes it owns the lock — which requires a second process/thread, not a write-then-delete dance.
🛠️ Suggested fix — actually leave the stale lock in place
stale_lock = queue_dir / ".drain.lock"
stale_lock.write_text("stale", encoding="utf-8")
- stale_lock.unlink()
assert drain_once(db_path=db_path, queue_dir=queue_dir) == 1
assert not stale_lock.exists()📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| stale_lock = queue_dir / ".drain.lock" | |
| stale_lock.write_text("stale", encoding="utf-8") | |
| stale_lock.unlink() | |
| assert drain_once(db_path=db_path, queue_dir=queue_dir) == 1 | |
| assert not stale_lock.exists() | |
| stale_lock = queue_dir / ".drain.lock" | |
| stale_lock.write_text("stale", encoding="utf-8") | |
| assert drain_once(db_path=db_path, queue_dir=queue_dir) == 1 | |
| assert not stale_lock.exists() |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/test_arbitration.py` around lines 343 - 348, The test currently creates
stale_lock then immediately unlinks it so drain_once never sees a sentinel;
remove the premature deletion and let drain_once handle the stale file: create
stale_lock = queue_dir / ".drain.lock", write the sentinel, do not call
stale_lock.unlink() before invoking drain_once(db_path=db_path,
queue_dir=queue_dir), then assert drain_once(...) == 1 and assert not
stale_lock.exists() afterwards so the test verifies that drain_once detects and
removes a stale .drain.lock; reference the stale_lock variable and the
drain_once(...) call to locate the change.
| "tags": tags, | ||
| }, | ||
| ) | ||
|
|
There was a problem hiding this comment.
Meta-research arbitration incorrectly clears entity JSON data
Medium Severity
_enqueue_meta_research_write calls _enqueue_enrichment_write which always passes entities=enrichment.get("entities", []). For meta-research, the enrichment dict lacks an "entities" key, so an empty list is passed. The drain's _apply_enrichment then sets raw_entities_json = "[]" because the value is not None. The direct-write path (_mark_meta_research) never touches raw_entities_json, so the arbitrated path introduces unintended data loss of existing entity associations.
Additional Locations (1)
Reviewed by Cursor Bugbot for commit 922aafd. Configure here.
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
There was a problem hiding this comment.
🟢 Low
When BRAINLAYER_ARBITRATED=1, the supersedes parameter is queued in the pending store dict but _flush_pending_stores never extracts it, so the supersede relationship is silently lost and the old chunk remains unmarked when the queue flushes.
🚀 Reply "fix it for me" or copy this AI Prompt for your agent:
In file src/brainlayer/mcp/store_handler.py around line 447:
When `BRAINLAYER_ARBITRATED=1`, the `supersedes` parameter is queued in the pending store dict but `_flush_pending_stores` never extracts it, so the supersede relationship is silently lost and the old chunk remains unmarked when the queue flushes.
Evidence trail:
src/brainlayer/mcp/store_handler.py lines 515-537 (queuing with supersedes at line 532), src/brainlayer/mcp/store_handler.py lines 425-470 (_flush_pending_stores calling store_memory without supersedes), src/brainlayer/store.py lines 47-64 (store_memory signature lacks supersedes param), src/brainlayer/mcp/store_handler.py lines 567-570 (supersede_chunk called separately in non-queued path)
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 64110e7e8d
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| for attempt in range(5): | ||
| conn = _open_connection(db_path) | ||
| attempt_drained = 0 | ||
| collision_ids: list[str] = [] | ||
| store_chunk_ids: list[str] = [] | ||
| try: | ||
| conn.execute("BEGIN IMMEDIATE") |
There was a problem hiding this comment.
Batch queue files in one SQLite transaction
When the queue contains many normal enqueue_jsonl writes, each event is its own .jsonl file, but this loop opens a fresh APSW connection, loads sqlite-vec, and runs BEGIN IMMEDIATE/COMMIT once per file instead of once per batch_size. In bursty producer scenarios the drain becomes transaction/setup-bound and falls behind even with batch_size=250 (the concurrent producer test only drained 2000 of 3000 items before its deadline), leaving arbitrated writes queued much longer than intended.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes and found 1 potential issue.
There are 2 total unresolved issues (including 1 from previous review).
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 64110e7. Configure here.
| time.sleep(delay) | ||
| continue | ||
| _log(log_path, f"drain failed for {path.name}: {exc}") | ||
| break |
There was a problem hiding this comment.
Non-BusyError drain failures leave queue files permanently stuck
Medium Severity
When a non-BusyError exception occurs during event processing inside drain_once (e.g., ValueError from a non-numeric importance value, or any data-level error), the except branch logs the failure and breaks without quarantining or deleting the file. The file stays as *.jsonl in the queue directory and is re-selected by sorted(queue_dir.glob("*.jsonl")) on every subsequent drain cycle, failing identically each time. Read errors (UnicodeDecodeError/OSError) are correctly quarantined via _quarantine_file, but processing-level exceptions are not, creating a permanently stuck file.
Reviewed by Cursor Bugbot for commit 64110e7. Configure here.


Summary
BRAINLAYER_ARBITRATED=1, while a single drain daemon drains the unified queue withBEGIN IMMEDIATEbatches.com.brainlayer.drain,com.brainlayer.watch, and weeklycom.brainlayer.repair-fts; installer now loads drain before watch/enrichment and creates required queue/log dirs.VectorStore._init_db; explicit repair now lives behindBRAINLAYER_REPAIR=1orbrainlayer repair-fts.brainlayer flushto migrate legacypending-stores.jsonlinto the unified queue and drain unified queue files.Scope notes / deviations
watcher_bridge.py, notwatcher.py; launchd setsBRAINLAYER_ARBITRATED=1for the newwatchagent.VectorStorefor read-side candidate selection, but enrichment/meta-research writes enqueue when arbitration is enabled.~/.brainlayer/queue; this PR moveshooks/indexer.pyto the shared helper and makes the drain support both the legacy hook shape and the new typed queue shape.Tests
pytest tests/test_arbitration.py -qfailed withModuleNotFoundError: No module named 'scripts.drain_daemon'.1 passed in 1.89s1 passed in 1.72s1 passed in 1.78spytest tests/test_arbitration.py tests/test_write_queue.py tests/test_search_trigram_fts.py -q->18 passed in 6.04s.ruff check ...->All checks passed!../scripts/run_tests.sh->BrainLayer test gate passedwith1829 passed, 9 skipped, 75 deselected, 1 xfailed, 102 warnings, plus MCP registration, isolated eval/hook routing, bun, and FTS regression passed../scripts/run_tests.sh->BrainLayer test gate passedwith1829 passed, 9 skipped, 75 deselected, 1 xfailed, 102 warnings, plus MCP registration, isolated eval/hook routing, bun, and FTS regression passed.Review
supersedes, log directory creation,PYTHON_BINvalidation, enqueue error propagation, malformed queue handling, and the drain file-deletion lock race.Note
Medium Risk
Changes core write paths to optionally enqueue and drain SQLite writes; misconfiguration (e.g., drain daemon not running) could cause write backlog or delayed embeddings/FTS repair.
Overview
Introduces a unified durable per-file JSONL write queue (via
queue_io.py) and a new single-writer drain implementation (drain.py) that applies queued store/watcher/hook/enrichment events underBEGIN IMMEDIATE, with retries, poison-file quarantine, collision logging, and optional post-drain embedding.Updates background producers to support arbitration mode (
BRAINLAYER_ARBITRATED=1): MCPbrain_storecan validate then enqueue (and still falls back to legacypending-stores.jsonlon queue failure), the watcher and enrichment paths enqueue writes instead of writing directly, andbrainlayer flushmigrates legacy pending stores into the new queue and drains it.Removes automatic trigram FTS backfill/repair from
VectorStorestartup, replacing it with an explicitbrainlayer repair-ftscommand and aVectorStore.repair_fts()method (also gated byBRAINLAYER_REPAIR=1), plus new/updated launchd plists and installer support forwatch,drain, and weeklyrepair-ftsjobs; adds documentation and tests covering concurrency, embedding, collisions, and migration behavior.Reviewed by Cursor Bugbot for commit 64110e7. Bugbot is set up for automated code reviews on this repo. Configure here.
Note
Arbitrate BrainLayer DB writes through a durable single-writer queue and drain daemon
BRAINLAYER_ARBITRATED=1to switch from direct writes to enqueuing.brainlayer repair-ftsCLI command and a weekly launchd job (com.brainlayer.repair-fts.plist).brainlayer flushnow migrates legacypending-stores.jsonlentries into the unified queue and drains in-process.BRAINLAYER_ARBITRATED=1no longer writes to the DB synchronously; writes are lost if the queue directory is unavailable andenqueuethrows.Macroscope summarized 64110e7.
Summary by CodeRabbit
New Features
Bug Fixes / Reliability
Tests
Docs