feat: tiered enrichment selectivity — T0/T1/T2/T3 classifier#82
feat: tiered enrichment selectivity — T0/T1/T2/T3 classifier#82
Conversation
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
@codex review |
|
Warning Rate limit exceeded
⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (3)
📝 WalkthroughWalkthroughThis PR introduces a tiered enrichment system for prioritizing chunk processing, implements LRU caching for hybrid search results, adds input validation to the search handler, refactors the cloud backfill script with APSW-based sidecar checkpointing, and includes comprehensive test coverage for new features and validation scenarios. Changes
Estimated code review effort🎯 5 (Critical) | ⏱️ ~110 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
📝 Coding Plan
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 9434be4cec
ℹ️ About Codex in GitHub
Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".
| DEFAULT_DB_PATH = get_db_path() | ||
| EXPORT_DIR = Path(__file__).resolve().parent / "backfill_data" | ||
| CHECKPOINT_TABLE = "enrichment_checkpoints" | ||
| CHECKPOINT_DB_PATH = DEFAULT_DB_PATH.with_name("enrichment_checkpoints.db") |
There was a problem hiding this comment.
Derive checkpoint sidecar path from the selected DB
CHECKPOINT_DB_PATH is fixed from DEFAULT_DB_PATH at import time, so even when the CLI passes a different --db path (see the runtime dispatch in run_full_backfill/resume_backfill), checkpoint reads and writes still target the default sidecar file. This mixes checkpoint state across databases and makes --resume/--status incorrect for non-default DBs.
Useful? React with 👍 / 👎.
| INSERT OR REPLACE INTO {CHECKPOINT_TABLE} ({', '.join(CHECKPOINT_COLUMNS)}) | ||
| VALUES ({placeholders}) |
There was a problem hiding this comment.
Prevent legacy migration from clobbering newer sidecar rows
The migration path runs on every ensure_checkpoint_table call and uses INSERT OR REPLACE for all legacy rows, which can overwrite newer sidecar statuses with stale values still stored in the main DB table. After one run updates a batch to completed/imported in the sidecar, a later startup can revert it back to an old submitted state and incorrectly treat finished jobs as pending.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
Actionable comments posted: 8
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
scripts/cloud_backfill.py (1)
940-959:⚠️ Potential issue | 🟠 Major
--statusstill hard-fails on a busy main DB.The checkpoint query moved to the sidecar, but
show_status()still constructsVectorStorebefore it reaches that block. If the main DB is locked, the status command never gets to the sidecar path even though it only needs read access plus best-effort stats.Suggested change
def show_status(db_path: Path) -> None: """Show status of all checkpoint jobs.""" - store = VectorStore(db_path) + store = open_backfill_store(db_path, allow_read_only_fallback=True) ensure_checkpoint_table(store)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@scripts/cloud_backfill.py` around lines 940 - 959, show_status() constructs VectorStore (VectorStore, ensure_checkpoint_table) before opening the sidecar checkpoint DB (_open_checkpoint_conn) so a locked main DB can abort the --status path; make VectorStore creation lazy or move it after the sidecar branch so the function first opens the sidecar connection and runs _ensure_checkpoint_table_in_conn / SELECT on CHECKPOINT_TABLE (using _open_checkpoint_conn and _ensure_checkpoint_table_in_conn) and only creates VectorStore and calls ensure_checkpoint_table if/when the main DB is actually needed for stats; keep function names unchanged and avoid touching the sidecar read path so --status can succeed when the main DB is busy.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@scripts/cloud_backfill.py`:
- Around line 189-203: _migrate_checkpoints_from_main_db currently uses "INSERT
OR REPLACE" which can overwrite newer sidecar rows with stale main-db data;
change the SQL in _migrate_checkpoints_from_main_db to perform an insert-only
migration (e.g., use "INSERT OR IGNORE INTO {CHECKPOINT_TABLE} ({',
'.join(CHECKPOINT_COLUMNS)}) VALUES (...)" or an INSERT ... SELECT ... WHERE NOT
EXISTS pattern) so existing sidecar rows are not replaced, and apply the same
insert-only change to the other migration block that uses
CHECKPOINT_TABLE/CHECKPOINT_COLUMNS (the similar code around the second
migration you pointed out). Ensure the executemany call still supplies the same
rows but the SQL prevents replacing existing records so save_checkpoint updates
in the sidecar are preserved.
- Around line 252-292: The save_checkpoint function performs a non-atomic SELECT
then INSERT/UPDATE which causes a race if two processes check-then-insert
concurrently; change it to perform a single atomic upsert against
CHECKPOINT_TABLE (e.g. use SQLite's INSERT ... ON CONFLICT(batch_id) DO UPDATE
SET ... or REPLACE INTO) instead of the separate existence check, building the
column list and the conflict-target update clause from kwargs so the write never
fails with a unique-constraint error; keep the existing retry logic around the
single upsert execution and ensure the operation is executed within the same
connection/transaction and the connection is closed in the finally block.
- Around line 358-366: The type hints are too narrow: functions like
export_unenriched_chunks and _init_sanitizer (called from run_full_backfill
which can return ReadOnlyBackfillStore via open_backfill_store) should accept
either VectorStore or ReadOnlyBackfillStore; update their annotations to a union
(VectorStore | ReadOnlyBackfillStore) or define a small StoreProtocol that
declares the used members (.conn, .get_enrichment_stats, .close) and use that
protocol as the parameter type, then update open_backfill_store,
run_full_backfill, export_unenriched_chunks, and _init_sanitizer signatures to
use the new union/protocol so type-checkers accept both store types.
In `@src/brainlayer/pipeline/enrichment_tiers.py`:
- Around line 46-50: classify_chunk_tier currently ignores the content_type
parameter which lets low-signal "noise" pass through; update classify_chunk_tier
to branch on content_type and handle types per guidelines: skip/return no tier
for "noise"; preserve verbatim (do not summarize or alter) for "ai_code",
"stack_trace", and "user_message"; treat "build_log" by summarizing it before
classification; for "dir_listing" extract only structural info (filenames/paths)
and drop file contents, then classify; ensure you reference and change logic
inside the classify_chunk_tier function (and the sibling classification block
around lines marked 63-75) so content_type drives the flow.
- Around line 71-74: classify_chunk_tier() and get_tier_source_filter()
disagree: classify_chunk_tier treats non-claude sources (e.g., "unknown") as
eligible for T1/T2 based on recency, but get_tier_source_filter() only includes
"claude_code" for those tiers; pick one consistent semantic and implement it:
either expand get_tier_source_filter() to include the same source categories
classify_chunk_tier() treats as T1/T2 (e.g., include "unknown" and other
non-claude sources) or restrict classify_chunk_tier() to only return T1/T2 for
"claude_code"; update both occurrences noted (the classification branch around
classify_chunk_tier and the source filter logic in get_tier_source_filter,
including the similar logic at lines ~116-120) so both functions use the same
source set for T1/T2. Ensure tests/usage expecting T1/T2 selection reflect the
chosen rule.
In `@src/brainlayer/search_repo.py`:
- Line 27: The module-level OrderedDict _hybrid_cache is accessed
non-atomically; add a module-level lock (e.g., _hybrid_cache_lock =
threading.RLock()) and wrap every compound check/read/move/update sequence that
touches _hybrid_cache in a with _hybrid_cache_lock: block so operations (lookup,
move_to_end, insert, pop) are atomic; update all places that manipulate
_hybrid_cache (including the initial definition site and the other usage sites
referenced by the comment) to acquire the lock before reading/modifying and
release after, ensuring you still use OrderedDict.move_to_end/popitem as before
but under the lock.
- Around line 30-65: The cache key builder _hybrid_cache_key is missing the
query_embedding so hybrid_search results can be incorrectly reused; update
_hybrid_cache_key to accept a query_embedding parameter and include a stable
representation of it (e.g., a deterministic hash like SHA-256 of the embedding
bytes or a tuple of rounded floats) in the returned tuple, and update any
callers (e.g., hybrid_search) to pass the embedding into _hybrid_cache_key so
cache entries are unique per embedding.
In `@tests/test_enrichment_tiers.py`:
- Around line 4-6: The docstring header "T0 (IMMEDIATE): manual/digest sources
and high-signal types (decision, mistake)" overstates T0 by referencing
memory_type values not accepted by the classifier/test API; update the docstring
in tests/test_enrichment_tiers.py (the header line starting with "T0
(IMMEDIATE):") to remove the parenthetical "decision, mistake" or replace it
with wording that matches the actual contract (e.g., "manual/digest sources and
high-signal types" or "manual/digest sources where classifier does not accept
memory_type"), ensuring the test description matches the classifier/test API
inputs.
---
Outside diff comments:
In `@scripts/cloud_backfill.py`:
- Around line 940-959: show_status() constructs VectorStore (VectorStore,
ensure_checkpoint_table) before opening the sidecar checkpoint DB
(_open_checkpoint_conn) so a locked main DB can abort the --status path; make
VectorStore creation lazy or move it after the sidecar branch so the function
first opens the sidecar connection and runs _ensure_checkpoint_table_in_conn /
SELECT on CHECKPOINT_TABLE (using _open_checkpoint_conn and
_ensure_checkpoint_table_in_conn) and only creates VectorStore and calls
ensure_checkpoint_table if/when the main DB is actually needed for stats; keep
function names unchanged and avoid touching the sidecar read path so --status
can succeed when the main DB is busy.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
Run ID: fce696de-dfb8-4f06-a221-02eb49d1706a
📒 Files selected for processing (10)
pyproject.tomlscripts/cloud_backfill.pysrc/brainlayer/mcp/search_handler.pysrc/brainlayer/pipeline/enrichment_tiers.pysrc/brainlayer/search_repo.pytests/conftest.pytests/test_cloud_backfill.pytests/test_enrichment_tiers.pytests/test_eval_baselines.pytests/test_search_validation.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: test (3.11)
- GitHub Check: test (3.13)
- GitHub Check: test (3.12)
🧰 Additional context used
📓 Path-based instructions (2)
src/brainlayer/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/brainlayer/**/*.py: Python package structure should follow the layout:src/brainlayer/for package code, with separate modules forvector_store.py,embeddings.py,daemon.py,dashboard/, andmcp/for different concerns
Usepaths.py:get_db_path()for all database path resolution instead of hardcoding paths; support environment variable overrides and canonical path fallback (~/.local/share/brainlayer/brainlayer.db)
Lint and format Python code usingruff check src/andruff format src/
Preserve verbatim content forai_code,stack_trace, anduser_messagemessage types during classification and chunking; skipnoisecontent entirely; summarizebuild_logcontent; extract structure-only fordir_listing
Use AST-aware chunking with tree-sitter; never split stack traces; mask large tool output during chunking
Handle SQLite concurrency by implementing retry logic onSQLITE_BUSYerrors; ensure each worker uses its own database connection
Prioritize MLX (Qwen2.5-Coder-14B-Instruct-4bit) on Apple Silicon (port 8080) as the enrichment backend; fall back to Ollama (glm-4.7-flashon port 11434) after 3 consecutive MLX failures; support backend override viaBRAINLAYER_ENRICH_BACKENDenvironment variable
Brain graph API must expose endpoints:/brain/graph,/brain/node/{node_id}(FastAPI)
Backlog API must support endpoints:/backlog/itemswith GET, POST, PATCH, DELETE operations (FastAPI)
Providebrainlayer brain-exportcommand to export brain graph as JSON for dashboard consumption
Providebrainlayer export-obsidiancommand to export as Markdown vault with backlinks and tags
For bulk database operations: stop enrichment workers first, checkpoint WAL before and after operations, drop FTS triggers before bulk deletes, batch deletes in 5-10K chunks with checkpoint every 3 batches, never delete fromchunkswhile FTS trigger is active
Files:
src/brainlayer/mcp/search_handler.pysrc/brainlayer/pipeline/enrichment_tiers.pysrc/brainlayer/search_repo.py
src/brainlayer/mcp/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
MCP tools must implement:
brain_search,brain_store,brain_recall,brain_entity,brain_expand,brain_update,brain_digest,brain_get_personwith legacybrainlayer_*aliases for backward compatibility
Files:
src/brainlayer/mcp/search_handler.py
🧠 Learnings (5)
📓 Common learnings
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T14:22:54.809Z
Learning: Applies to src/brainlayer/**/*.py : For bulk database operations: stop enrichment workers first, checkpoint WAL before and after operations, drop FTS triggers before bulk deletes, batch deletes in 5-10K chunks with checkpoint every 3 batches, never delete from `chunks` while FTS trigger is active
📚 Learning: 2026-03-12T14:22:54.809Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T14:22:54.809Z
Learning: Applies to src/brainlayer/mcp/**/*.py : MCP tools must implement: `brain_search`, `brain_store`, `brain_recall`, `brain_entity`, `brain_expand`, `brain_update`, `brain_digest`, `brain_get_person` with legacy `brainlayer_*` aliases for backward compatibility
Applied to files:
src/brainlayer/mcp/search_handler.py
📚 Learning: 2026-03-12T14:22:54.809Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T14:22:54.809Z
Learning: Applies to src/brainlayer/**/*.py : Preserve verbatim content for `ai_code`, `stack_trace`, and `user_message` message types during classification and chunking; skip `noise` content entirely; summarize `build_log` content; extract structure-only for `dir_listing`
Applied to files:
src/brainlayer/pipeline/enrichment_tiers.py
📚 Learning: 2026-03-12T14:22:54.809Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T14:22:54.809Z
Learning: Applies to src/brainlayer/**/*.py : Prioritize MLX (`Qwen2.5-Coder-14B-Instruct-4bit`) on Apple Silicon (port 8080) as the enrichment backend; fall back to Ollama (`glm-4.7-flash` on port 11434) after 3 consecutive MLX failures; support backend override via `BRAINLAYER_ENRICH_BACKEND` environment variable
Applied to files:
src/brainlayer/pipeline/enrichment_tiers.py
📚 Learning: 2026-03-12T14:22:54.809Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-12T14:22:54.809Z
Learning: Applies to src/brainlayer/**/*.py : For bulk database operations: stop enrichment workers first, checkpoint WAL before and after operations, drop FTS triggers before bulk deletes, batch deletes in 5-10K chunks with checkpoint every 3 batches, never delete from `chunks` while FTS trigger is active
Applied to files:
scripts/cloud_backfill.py
🔇 Additional comments (7)
pyproject.toml (1)
107-110: Good pytest path precedence change.
pythonpath = ["src"]is a solid fix for source-first imports in local/dev test runs.tests/conftest.py (1)
17-25: Nice isolation fixture for eval tests.Unique per-invocation project names are the right approach to avoid cross-case data leakage.
src/brainlayer/mcp/search_handler.py (1)
130-135: Good boundary validation at the MCP edge.Rejecting invalid
detail/num_resultsbefore routing is the right behavior and prevents silent fallback/clamping.tests/test_search_validation.py (1)
23-29: Strong regression coverage for validation and cache behavior.These tests are well targeted and materially reduce risk for the new search boundary + warm-cache path.
Also applies to: 31-152, 303-411
tests/test_eval_baselines.py (1)
931-936: No action needed —get_db_path()is explicitly typed to returnPath.The function signature at
src/brainlayer/paths.py:14declaresdef get_db_path() -> Path:and both implementations (line 21 and 24) returnPathobjects. Calling.exists()on the result is safe and correct.> Likely an incorrect or invalid review comment.scripts/cloud_backfill.py (1)
111-134: Nice extraction of the batch payload and pricing helpers.Centralizing
thinkingBudget=0and the batch-rate math makes the export path and its regression tests much harder to drift.tests/test_cloud_backfill.py (1)
38-250: Good regression coverage for the new sidecar flow.These tests hit the highest-risk additions here:
thinkingBudget=0, discounted cost math, main→sidecar migration, export reuse, and read-only fallback.
| def _migrate_checkpoints_from_main_db(store: Optional[VectorStore], conn: apsw.Connection) -> int: | ||
| """Copy legacy checkpoint rows into the sidecar DB so state is preserved.""" | ||
| rows = _main_checkpoint_rows(store) | ||
| if not rows: | ||
| return 0 | ||
|
|
||
| placeholders = ", ".join("?" for _ in CHECKPOINT_COLUMNS) | ||
| conn.cursor().executemany( | ||
| f""" | ||
| INSERT OR REPLACE INTO {CHECKPOINT_TABLE} ({', '.join(CHECKPOINT_COLUMNS)}) | ||
| VALUES ({placeholders}) | ||
| """, | ||
| rows, | ||
| ) | ||
| return len(rows) |
There was a problem hiding this comment.
Make the legacy migration insert-only.
ensure_checkpoint_table() runs on every invocation, but _migrate_checkpoints_from_main_db() currently replays legacy rows with INSERT OR REPLACE. Since save_checkpoint() no longer updates the main table, a later startup can overwrite a newer sidecar row back to the stale main-DB status.
Suggested change
- conn.cursor().executemany(
- f"""
- INSERT OR REPLACE INTO {CHECKPOINT_TABLE} ({', '.join(CHECKPOINT_COLUMNS)})
- VALUES ({placeholders})
- """,
- rows,
- )
+ conn.cursor().executemany(
+ f"""
+ INSERT OR IGNORE INTO {CHECKPOINT_TABLE} ({', '.join(CHECKPOINT_COLUMNS)})
+ VALUES ({placeholders})
+ """,
+ rows,
+ )Also applies to: 242-249
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/cloud_backfill.py` around lines 189 - 203,
_migrate_checkpoints_from_main_db currently uses "INSERT OR REPLACE" which can
overwrite newer sidecar rows with stale main-db data; change the SQL in
_migrate_checkpoints_from_main_db to perform an insert-only migration (e.g., use
"INSERT OR IGNORE INTO {CHECKPOINT_TABLE} ({', '.join(CHECKPOINT_COLUMNS)})
VALUES (...)" or an INSERT ... SELECT ... WHERE NOT EXISTS pattern) so existing
sidecar rows are not replaced, and apply the same insert-only change to the
other migration block that uses CHECKPOINT_TABLE/CHECKPOINT_COLUMNS (the similar
code around the second migration you pointed out). Ensure the executemany call
still supplies the same rows but the SQL prevents replacing existing records so
save_checkpoint updates in the sidecar are preserved.
| def save_checkpoint(store: VectorStore, batch_id: str, **kwargs) -> None: | ||
| """Insert or update a checkpoint row in the sidecar checkpoint DB.""" | ||
| del store # Legacy signature retained for callers; checkpoint writes avoid the main DB. | ||
| last_error: Exception | None = None | ||
|
|
||
| for attempt in range(CHECKPOINT_WRITE_MAX_RETRIES): | ||
| conn = None | ||
| try: | ||
| conn = _open_checkpoint_conn() | ||
| _ensure_checkpoint_table_in_conn(conn) | ||
| cursor = conn.cursor() | ||
| existing = list( | ||
| cursor.execute( | ||
| f"SELECT batch_id FROM {CHECKPOINT_TABLE} WHERE batch_id = ?", | ||
| [batch_id], | ||
| ) | ||
| ) | ||
| if existing: | ||
| sets = ", ".join(f"{k} = ?" for k in kwargs) | ||
| params = list(kwargs.values()) + [batch_id] | ||
| cursor.execute(f"UPDATE {CHECKPOINT_TABLE} SET {sets} WHERE batch_id = ?", params) | ||
| else: | ||
| cols = ["batch_id"] + list(kwargs.keys()) | ||
| vals = [batch_id] + list(kwargs.values()) | ||
| placeholders = ", ".join("?" for _ in cols) | ||
| cursor.execute( | ||
| f"INSERT INTO {CHECKPOINT_TABLE} ({', '.join(cols)}) VALUES ({placeholders})", | ||
| vals, | ||
| ) | ||
| return | ||
| except apsw.BusyError as exc: | ||
| last_error = exc | ||
| if attempt == CHECKPOINT_WRITE_MAX_RETRIES - 1: | ||
| break | ||
| time.sleep(CHECKPOINT_WRITE_BASE_DELAY * (2**attempt)) | ||
| finally: | ||
| if conn is not None: | ||
| conn.close() | ||
|
|
||
| if last_error is not None: | ||
| raise last_error |
There was a problem hiding this comment.
Avoid the SELECT/INSERT race in save_checkpoint.
The existence check and follow-up write are not atomic. Two concurrent backfill processes can both miss the row, then one dies on the INSERT with a constraint error that this retry loop never handles.
Suggested change
- existing = list(
- cursor.execute(
- f"SELECT batch_id FROM {CHECKPOINT_TABLE} WHERE batch_id = ?",
- [batch_id],
- )
- )
- if existing:
- sets = ", ".join(f"{k} = ?" for k in kwargs)
- params = list(kwargs.values()) + [batch_id]
- cursor.execute(f"UPDATE {CHECKPOINT_TABLE} SET {sets} WHERE batch_id = ?", params)
- else:
- cols = ["batch_id"] + list(kwargs.keys())
- vals = [batch_id] + list(kwargs.values())
- placeholders = ", ".join("?" for _ in cols)
- cursor.execute(
- f"INSERT INTO {CHECKPOINT_TABLE} ({', '.join(cols)}) VALUES ({placeholders})",
- vals,
- )
+ cols = ["batch_id"] + list(kwargs.keys())
+ vals = [batch_id] + list(kwargs.values())
+ placeholders = ", ".join("?" for _ in cols)
+ updates = ", ".join(f"{col} = excluded.{col}" for col in kwargs)
+ cursor.execute(
+ f"""
+ INSERT INTO {CHECKPOINT_TABLE} ({', '.join(cols)})
+ VALUES ({placeholders})
+ ON CONFLICT(batch_id) DO UPDATE SET {updates}
+ """,
+ vals,
+ )🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/cloud_backfill.py` around lines 252 - 292, The save_checkpoint
function performs a non-atomic SELECT then INSERT/UPDATE which causes a race if
two processes check-then-insert concurrently; change it to perform a single
atomic upsert against CHECKPOINT_TABLE (e.g. use SQLite's INSERT ... ON
CONFLICT(batch_id) DO UPDATE SET ... or REPLACE INTO) instead of the separate
existence check, building the column list and the conflict-target update clause
from kwargs so the write never fails with a unique-constraint error; keep the
existing retry logic around the single upsert execution and ensure the operation
is executed within the same connection/transaction and the connection is closed
in the finally block.
| def open_backfill_store(db_path: Path, *, allow_read_only_fallback: bool = False): | ||
| """Open the main DB for backfill work, with read-only fallback for export-only runs.""" | ||
| try: | ||
| return VectorStore(db_path) | ||
| except apsw.BusyError: | ||
| if not allow_read_only_fallback: | ||
| raise | ||
| print("Main DB open hit BusyError; falling back to read-only export mode.") | ||
| return ReadOnlyBackfillStore(db_path) |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '== Sanitizer helper definition ==\n'
rg -n -C5 --type=py 'def build_name_dictionary\(' src scripts
printf '\n== Fallback/export call chain ==\n'
rg -n -C3 --type=py 'class ReadOnlyBackfillStore|def open_backfill_store\(|def _init_sanitizer\(|def export_unenriched_chunks\(' scriptsRepository: EtanHey/brainlayer
Length of output: 2741
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '== ReadOnlyBackfillStore full implementation ==\n'
sed -n '315,356p' scripts/cloud_backfill.py
printf '\n== _init_sanitizer full implementation ==\n'
sed -n '371,403p' scripts/cloud_backfill.py
printf '\n== build_name_dictionary full implementation ==\n'
sed -n '466,485p' src/brainlayer/pipeline/sanitize.pyRepository: EtanHey/brainlayer
Length of output: 4207
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '== VectorStore conn attribute ==\n'
rg -n -C8 'class VectorStore' src scripts | head -40
printf '\n== export_unenriched_chunks full implementation ==\n'
sed -n '405,500p' scripts/cloud_backfill.py
printf '\n== Lines 798-823 (second location) ==\n'
sed -n '798,823p' scripts/cloud_backfill.pyRepository: EtanHey/brainlayer
Length of output: 6158
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '== VectorStore.__init__ and conn ==\n'
rg -n -A30 'class VectorStore' src/brainlayer/vector_store.py | grep -A30 '__init__'
printf '\n== VectorStore.conn attribute ==\n'
rg -n 'self\.conn' src/brainlayer/vector_store.py | head -20Repository: EtanHey/brainlayer
Length of output: 1395
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '== Verify both connection types support cursor() ==\n'
python3 << 'EOF'
import sqlite3
import sys
# Check sqlite3.Connection API
print("sqlite3.Connection methods:")
conn_methods = [m for m in dir(sqlite3.Connection) if not m.startswith('_')]
print(f" - has cursor(): {'cursor' in conn_methods}")
# Check apsw.Connection API (document it)
print("\napsw.Connection documentation:")
try:
import apsw
apsw_methods = [m for m in dir(apsw.Connection) if not m.startswith('_')]
print(f" - has cursor(): {'cursor' in apsw_methods}")
except ImportError:
print(" (apsw not available in sandbox, but it's a known library)")
EOF
printf '\n== Check all store method calls in export chain ==\n'
printf 'Methods called on store parameter:\n'
rg -n 'store\.' scripts/cloud_backfill.py | grep -E '(export_unenriched|_init_sanitizer|open_backfill|798|803)'Repository: EtanHey/brainlayer
Length of output: 392
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '== Check if ReadOnlyBackfillStore.get_enrichment_stats exists ==\n'
sed -n '315,356p' scripts/cloud_backfill.py | grep -A2 'def get_enrichment_stats'
printf '\n== Verify ReadOnlyBackfillStore has close method ==\n'
sed -n '315,356p' scripts/cloud_backfill.py | grep -A1 'def close'
printf '\n== Check if export flow actually uses store methods beyond conn/close ==\n'
printf 'All store.* calls in export_unenriched_chunks:\n'
sed -n '405,500p' scripts/cloud_backfill.py | grep 'store\.'Repository: EtanHey/brainlayer
Length of output: 555
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '== Check if VectorStore.close exists ==\n'
rg -n 'def close' src/brainlayer/vector_store.py
printf '\n== Check what type get_enrichment_stats returns ==\n'
rg -n -A5 'def get_enrichment_stats' src/brainlayer/vector_store.pyRepository: EtanHey/brainlayer
Length of output: 186
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '== Search for get_enrichment_stats across codebase ==\n'
rg -n 'def get_enrichment_stats' src scripts
printf '\n== Check mixin classes for get_enrichment_stats ==\n'
rg -n 'class SearchMixin\|class KGMixin\|class SessionMixin' src/brainlayer/vector_store.py
printf '\n== Check line 803 context - what store is used ==\n'
sed -n '795,810p' scripts/cloud_backfill.pyRepository: EtanHey/brainlayer
Length of output: 340
🏁 Script executed:
#!/bin/bash
set -euo pipefail
printf '== Verify VectorStore inherits from SessionMixin ==\n'
sed -n '56p' src/brainlayer/vector_store.py
printf '\n== Check SessionMixin.get_enrichment_stats ==\n'
sed -n '186,210p' src/brainlayer/session_repo.py
printf '\n== Summary: All store methods used in export flow ==\n'
cat << 'EOF'
Methods called on store in export flow:
1. store.conn.cursor() - line 427 in export_unenriched_chunks
2. store.get_enrichment_stats() - line 803 in run_full_backfill
ReadOnlyBackfillStore provides:
- .conn (sqlite3.Connection)
- .get_enrichment_stats() (custom implementation)
- .close()
VectorStore provides (via inheritance):
- .conn (apsw.Connection)
- .get_enrichment_stats() (from SessionMixin)
- .close()
Connection compatibility:
- Both sqlite3.Connection and apsw.Connection support .cursor()
EOFRepository: EtanHey/brainlayer
Length of output: 1844
Update type hints to properly reflect that functions accept either store type.
run_full_backfill() can pass a ReadOnlyBackfillStore into export_unenriched_chunks() and _init_sanitizer(), but these functions are typed as store: VectorStore. This works at runtime because both ReadOnlyBackfillStore and VectorStore provide .conn, .get_enrichment_stats(), and .close(), and both connection types support .cursor()—but the type mismatch creates a type-checking problem.
Consider using a Protocol or Union type to document the actual contract these functions require (e.g., store: VectorStore | ReadOnlyBackfillStore or a StoreProtocol with just the needed methods).
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/cloud_backfill.py` around lines 358 - 366, The type hints are too
narrow: functions like export_unenriched_chunks and _init_sanitizer (called from
run_full_backfill which can return ReadOnlyBackfillStore via
open_backfill_store) should accept either VectorStore or ReadOnlyBackfillStore;
update their annotations to a union (VectorStore | ReadOnlyBackfillStore) or
define a small StoreProtocol that declares the used members (.conn,
.get_enrichment_stats, .close) and use that protocol as the parameter type, then
update open_backfill_store, run_full_backfill, export_unenriched_chunks, and
_init_sanitizer signatures to use the new union/protocol so type-checkers accept
both store types.
| _HYBRID_CACHE_MAX = 128 # max entries (LRU eviction) | ||
|
|
||
| # Module-level LRU cache: {cache_key: (result, timestamp)} | ||
| _hybrid_cache: "OrderedDict[tuple, tuple[dict, float]]" = OrderedDict() |
There was a problem hiding this comment.
Protect module-level LRU cache with a lock.
The current check/read/move/update sequence on _hybrid_cache is non-atomic. Under concurrent requests, this can race and produce inconsistent cache state or intermittent KeyError.
Suggested fix
import copy
import json
import math
import os
+import threading
import time
from collections import OrderedDict
@@
_hybrid_cache: "OrderedDict[tuple, tuple[dict, float]]" = OrderedDict()
+_hybrid_cache_lock = threading.RLock()
@@
- if cache_key in _hybrid_cache:
- cached_result, cached_at = _hybrid_cache[cache_key]
- if now - cached_at < _HYBRID_CACHE_TTL:
- _hybrid_cache.move_to_end(cache_key) # LRU touch
- return _clone_hybrid_result(cached_result)
- else:
- del _hybrid_cache[cache_key]
+ with _hybrid_cache_lock:
+ if cache_key in _hybrid_cache:
+ cached_result, cached_at = _hybrid_cache[cache_key]
+ if now - cached_at < _HYBRID_CACHE_TTL:
+ _hybrid_cache.move_to_end(cache_key) # LRU touch
+ return _clone_hybrid_result(cached_result)
+ else:
+ del _hybrid_cache[cache_key]
@@
- _hybrid_cache[cache_key] = (_clone_hybrid_result(result), time.monotonic())
- _hybrid_cache.move_to_end(cache_key)
- if len(_hybrid_cache) > _HYBRID_CACHE_MAX:
- _hybrid_cache.popitem(last=False) # evict oldest
+ with _hybrid_cache_lock:
+ _hybrid_cache[cache_key] = (_clone_hybrid_result(result), time.monotonic())
+ _hybrid_cache.move_to_end(cache_key)
+ if len(_hybrid_cache) > _HYBRID_CACHE_MAX:
+ _hybrid_cache.popitem(last=False) # evict oldestAlso applies to: 443-449, 671-674
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/search_repo.py` at line 27, The module-level OrderedDict
_hybrid_cache is accessed non-atomically; add a module-level lock (e.g.,
_hybrid_cache_lock = threading.RLock()) and wrap every compound
check/read/move/update sequence that touches _hybrid_cache in a with
_hybrid_cache_lock: block so operations (lookup, move_to_end, insert, pop) are
atomic; update all places that manipulate _hybrid_cache (including the initial
definition site and the other usage sites referenced by the comment) to acquire
the lock before reading/modifying and release after, ensuring you still use
OrderedDict.move_to_end/popitem as before but under the lock.
| T0 (IMMEDIATE): manual/digest sources and high-signal types (decision, mistake) | ||
| T1 (HOURLY): recent claude_code chunks (ai_code, stack_trace, user_message, assistant_text) | ||
| T2 (LAZY): old claude_code backlog (older than recency_days) |
There was a problem hiding this comment.
Test docstring overstates T0 criteria.
The header says T0 includes decision/mistake types, but the classifier/test API has no memory_type input. Please align the docstring to the actual contract.
Suggested fix
-T0 (IMMEDIATE): manual/digest sources and high-signal types (decision, mistake)
+T0 (IMMEDIATE): manual/digest sources🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_enrichment_tiers.py` around lines 4 - 6, The docstring header "T0
(IMMEDIATE): manual/digest sources and high-signal types (decision, mistake)"
overstates T0 by referencing memory_type values not accepted by the
classifier/test API; update the docstring in tests/test_enrichment_tiers.py (the
header line starting with "T0 (IMMEDIATE):") to remove the parenthetical
"decision, mistake" or replace it with wording that matches the actual contract
(e.g., "manual/digest sources and high-signal types" or "manual/digest sources
where classifier does not accept memory_type"), ensuring the test description
matches the classifier/test API inputs.
Add `EnrichmentTier` (IntEnum) and `classify_chunk_tier()` in `src/brainlayer/pipeline/enrichment_tiers.py`. Tier logic: T0 IMMEDIATE — manual/digest sources (always enrich, highest priority) T1 HOURLY — recent claude_code within recency window (default 7 days) T2 LAZY — old claude_code backlog (lazy/remote batch) T3 EXPLICIT — youtube transcripts (never touched by default pipeline) Implements Finding 1 from the enrichment efficiency audit: "the current HIGH_VALUE_TYPES selector covers 97.75% of all chunks — source-aware tiers are needed". Also adds `pythonpath = ["src"]` to pytest config so the worktree src dir takes precedence over the editable install during development. 24 new TDD tests, all green. Full suite: 834 passed, 0 regressions. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
- Remove unused `pytest` import from test_enrichment_tiers.py (F401) - Sort import blocks in test_cloud_backfill.py and test_search_validation.py (I001) Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Two issues from CodeRabbit review:
1. content_type was accepted but never used, allowing 'noise' chunks to enter
the tier pipeline. Add SKIP_CONTENT_TYPES = {"noise"} and check it first.
2. classify_chunk_tier() assigned unknown sources to T1/T2 based on recency,
but get_tier_source_filter(T1) only returned {"claude_code"}, making the
two functions inconsistent. Add T1_T2_SOURCES = {"claude_code"} and route
unrecognised sources to T2 unconditionally (lazy backlog, not hourly T1).
Tests: 26/26 pass, 836 total suite passes.
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
36fb246 to
87819b8
Compare
Summary
EnrichmentTier(IntEnum T0–T3) andclassify_chunk_tier()insrc/brainlayer/pipeline/enrichment_tiers.pyHIGH_VALUE_TYPEScovers 97.75% of all chunks, making it effectively full-stream enrichmentpythonpath = ["src"]to pytest config so worktree src takes precedence over editable install during developmentTier design
manual,digestsourcesclaude_code(≤7 days)claude_code(>7 days)youtubetranscriptsTest plan
python3 -m pytest tests/test_enrichment_tiers.py -v— 24/24 passpython3 -m pytest tests/ -q -m "not live and not slow and not integration"— 834 passed, 0 regressionsclassify_chunk_tierintoenrich_batchandexport_unenriched_chunksselectors (follow-up PR)🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Performance Improvements