feat: KG rebuild pipeline with audit fixes#67
Conversation
…ety, audit fixes
Rebuild the knowledge graph from scratch. The KG had 46 entities from 194K
enriched chunks because the extraction hook had use_llm=False, use_gliner=False,
and seed_entities={} — triple disabled.
Changes:
- Enable seed entity matching + tag-based extraction in enrichment hook
- Add extract_entities_from_tags() for zero-API-call entity extraction
- Add kg_extraction_groq.py for Groq-backed multi-chunk NER batching
- Add scripts/kg_rebuild.py (tier1 seed+tag, tier2 Groq NER)
- Add scripts/kg_dedup.py for entity dedup, alias, false positive cleanup
- Fix thread-safety: per-thread read connections via threading.local()
- Fix mention_type downgrade: explicit mentions preserved over inferred
- Fix close() leaving stale read_conn reference after closing
- Fix non-string tags crashing extract_entities_from_tags
- Fix tag normalization missing dots (node.js → nodejs)
- Update CLAUDE.md with accurate enrichment backend docs
Results: 119 entities (was 46), 153,773 entity-chunk links (was 597).
Tests: 18 new tests in test_kg_rebuild.py, 680 total passing.
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
|
Note Currently processing new changes in this PR. This may take a few minutes, please wait... 📒 Files selected for processing (7)
✏️ Tip: You can disable in-progress messages and the fortune message in your review settings. Tip CodeRabbit can generate a title for your PR based on the changes.Add 📝 WalkthroughWalkthroughThe PR introduces KG extraction with tag-based and seed-based approaches, implements two-tier batch rebuild via Groq NER, adds entity deduplication and cleanup scripts, enables thread-local read connections for concurrent database access, and updates documentation on enrichment backend configuration and bulk DB operation safety. Changes
Sequence Diagram(s)sequenceDiagram
participant Chunk as Enriched Chunk
participant Pipeline as Enrichment Pipeline
participant TagExt as Tag Extractor
participant SeedExt as Seed Extractor
participant KGStore as KG Store
Chunk->>Pipeline: enrichment metadata + content
alt Tag Processing
Pipeline->>TagExt: tags from enrichment
TagExt->>TagExt: normalize & match tags
TagExt->>KGStore: ExtractionResult (0.85/0.80 confidence)
end
alt Seed-based Extraction
Pipeline->>SeedExt: chunk content + DEFAULT_SEED_ENTITIES
SeedExt->>SeedExt: extract entities from text
SeedExt->>KGStore: ExtractionResult + relationships
end
KGStore->>KGStore: process_extraction_result
KGStore->>KGStore: link entities to chunk
sequenceDiagram
participant Store as VectorStore
participant Tier1 as Tier 1<br/>Seed & Tags
participant Tier2 as Tier 2<br/>Groq NER
participant Groq as Groq API
participant KGBackend as KG Backend
Store->>Tier1: Tier 1: extract all chunks
Tier1->>Tier1: batch seed/tag extraction
Tier1->>KGBackend: process results
Store->>Tier2: Tier 2: fetch high-importance chunks
Tier2->>Tier2: batch chunks (chunks_per_call)
Tier2->>Tier2: RateLimiter: throttle calls
Tier2->>Groq: build_multi_chunk_ner_prompt
Groq-->>Tier2: parse_multi_chunk_response
Tier2->>KGBackend: entities + relations per chunk
Tier2->>Tier2: save_progress (resume capable)
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Possibly related PRs
Poem
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Fix Tier 2 pagination bug: remove OFFSET (shrinking result set skips chunks) - Use setbusytimeout(30_000) for read connections (consistent with write conn) - Log KG extraction failures at WARNING level (was DEBUG, invisible in prod) - Use _read_cursor() for read-only queries in kg_dedup.py - Remove unused imports (uuid, os, time, parse_llm_ner_response, resolve_entity) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 9
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@CLAUDE.md`:
- Around line 68-69: Add a blank line immediately after the Markdown heading "##
Bulk DB Operations (SAFETY)" so the heading is followed by an empty line (to
satisfy MD022); update the CLAUDE.md file by inserting a single newline line
between that heading and the following list item "1. **Stop enrichment workers
first** — never run bulk ops while enrichment is writing (causes WAL bloat +
potential freeze)".
In `@scripts/kg_dedup.py`:
- Around line 124-128: The deletion block removes rows from kg_entity_chunks,
kg_relations, kg_entity_aliases and kg_entities but misses deleting
corresponding embeddings in kg_vec_entities, leaving orphans; update the
deletion sequence (the cursor.execute block that uses eid) to also execute
"DELETE FROM kg_vec_entities WHERE entity_id = ?" (using the same eid parameter)
before/alongside deleting from kg_entities, and ensure this runs inside the same
transaction so vector and KG tables remain consistent.
In `@scripts/kg_rebuild.py`:
- Around line 229-277: Wrap the processing of each chunk_result in a try/except
so one malformed chunk_result cannot abort the entire parsed_results loop:
inside the for chunk_result in parsed_results loop (the block that builds
entities, relations, creates ExtractionResult and calls
process_extraction_result), catch any Exception, log the error with context
(chunk_id and the chunk_result payload) and continue to the next chunk_result;
apply the same per-item try/except pattern to the analogous block that handles
results at the later location referenced (the code that appends to
stats["entities_found"] and stats["relations_found"]) so each chunk is isolated
from failures.
- Around line 64-67: load_progress currently assumes PROGRESS_FILE contains
valid JSON and save_progress writes directly to the final path, so implement
atomic persistence: modify save_progress to write JSON to a temporary file
(e.g., PROGRESS_FILE.with_suffix(".tmp") or similar) and then atomically replace
the final file (os.replace) to avoid truncated files; update load_progress to
catch JSONDecodeError (and FileNotFoundError) and fall back to the default dict
{"tier1_done": False, "tier2_last_offset": 0, "tier2_processed": 0} so a
partial/corrupted file won't break --resume; apply these changes to the
functions named save_progress and load_progress and use the PROGRESS_FILE symbol
to locate the file handling.
- Around line 185-196: The query uses OFFSET pagination which will skip rows as
links are created because the WHERE clause filters out linked chunks; replace
OFFSET-based paging with stable keyset pagination: change the SQL in the query
variable to accept a last-seen cursor (e.g., last_importance and last_id) and
add a WHERE clause like "AND (c.importance < :last_importance OR (c.importance =
:last_importance AND c.id > :last_id))" (matching the existing ORDER BY
c.importance DESC, c.id) and remove OFFSET; update the loop that calls this
query to pass and update the cursor values (track the last row's importance and
id after each page) and repeat until fewer than LIMIT rows are returned; apply
the same change pattern to the other queries noted around the chunks of code at
the locations you mentioned (the queries used at lines 206-209 and 284-289) so
all pagination switches from LIMIT/OFFSET to keyset using last_importance and
last_id.
- Around line 326-356: Wrap the orchestration from after store creation in a
try/finally so store.close() always runs: create db_path and store with
get_db_path() and VectorStore(...) as before, then put the logic that checks
args.stats, args.tier1/2, calls print_kg_stats, tier1_seed_and_tags,
tier2_groq_ner, and logger.info("Done!") inside a try block and move the single
store.close() into the finally block; remove the multiple early store.close()
calls so the finally always handles cleanup even if an exception occurs.
In `@src/brainlayer/pipeline/entity_extraction.py`:
- Around line 450-481: The loop over tags in entity_extraction.py currently
appends an ExtractedEntity for every matching tag, causing duplicates; fix it by
deduplicating on the normalized tag key (the same key used to lookup
norm_projects/norm_tech) before emitting entities—introduce a seen_norm_tags
set, compute tag_norm the same way (tag.lower().replace("-", "").replace("_",
"").replace(".", "")), skip if in seen_norm_tags, otherwise add to
seen_norm_tags and then append the ExtractedEntity (preserving the existing
project/technology priority and fields such as text, entity_type, start, end,
confidence, source).
In `@src/brainlayer/pipeline/kg_extraction_groq.py`:
- Around line 70-80: The loop over parsed.get("chunks", []) assumes each
chunk_data is a dict and will raise if an element is malformed; update the loop
that builds results (the block using chunk_data, chunk_id, entities, relations,
and results.append) to first verify chunk_data is a dict (e.g., isinstance
check), and if not skip it or coerce to an empty dict, then safely call .get for
"chunk_id", "entities", and "relations" with your existing default values so
malformed entries do not abort processing.
In `@src/brainlayer/vector_store.py`:
- Around line 503-519: The close() path currently only closes the current
thread's self._local.read_conn and misses per-thread read connections created by
_get_read_conn; to fix, add a thread-safe registry (e.g., self._read_conns) that
_get_read_conn appends each new apsw.Connection to (use weakref.WeakSet or store
weakrefs and protect with a lock) and then update close() to iterate this
registry and close every connection found, clearing the registry and removing
any references; ensure you still set/clear self._local.read_conn as before and
guard registry mutations with a threading.Lock to avoid races.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (9)
CLAUDE.mdscripts/kg_dedup.pyscripts/kg_rebuild.pysrc/brainlayer/kg_repo.pysrc/brainlayer/pipeline/enrichment.pysrc/brainlayer/pipeline/entity_extraction.pysrc/brainlayer/pipeline/kg_extraction_groq.pysrc/brainlayer/vector_store.pytests/test_kg_rebuild.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: test (3.11)
- GitHub Check: test (3.13)
- GitHub Check: test (3.12)
🧰 Additional context used
📓 Path-based instructions (5)
tests/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Run tests with
pytest
Files:
tests/test_kg_rebuild.py
src/brainlayer/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/brainlayer/**/*.py: Use Typer CLI framework for command-line interface implementation insrc/brainlayer/
Handle concurrency by retrying onSQLITE_BUSYerrors; each worker should use its own database connection
Export brain graph as JSON for Next.js dashboard usingbrainlayer brain-exportcommand
Export Obsidian vault with Markdown files including backlinks and tags usingbrainlayer export-obsidiancommand
Files:
src/brainlayer/pipeline/kg_extraction_groq.pysrc/brainlayer/kg_repo.pysrc/brainlayer/pipeline/enrichment.pysrc/brainlayer/pipeline/entity_extraction.pysrc/brainlayer/vector_store.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Use
ruff check src/for linting andruff format src/for code formatting
Files:
src/brainlayer/pipeline/kg_extraction_groq.pysrc/brainlayer/kg_repo.pysrc/brainlayer/pipeline/enrichment.pysrc/brainlayer/pipeline/entity_extraction.pysrc/brainlayer/vector_store.py
src/brainlayer/**/*enrich*.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/brainlayer/**/*enrich*.py: Use either Ollama (glm4) or MLX as enrichment backends, configurable viaBRAINLAYER_ENRICH_BACKENDenvironment variable
Set"think": falsefor GLM-4.7 enrichment backend for speed optimization
Enrichment should add metadata: summary, tags, importance, and intent; capture decisions and corrections in session enrichment
Cache prompts in~/.local/share/brainlayer/prompts/
Use enrichment lock file at/tmp/brainlayer-enrichment.lockto prevent concurrent enrichment
Files:
src/brainlayer/pipeline/enrichment.py
src/brainlayer/vector_store.py
📄 CodeRabbit inference engine (CLAUDE.md)
src/brainlayer/vector_store.py: Use APSW with sqlite-vec for vector storage implementation invector_store.py
Store database at~/.local/share/brainlayer/brainlayer.dbwith WAL mode andbusy_timeout=5000
Files:
src/brainlayer/vector_store.py
🧠 Learnings (12)
📚 Learning: 2026-02-28T09:42:45.691Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-28T09:42:45.691Z
Learning: Applies to src/brainlayer/**/*chunk*.py : Preserve verbatim chunks for types: `ai_code`, `stack_trace`, `user_message` in classification and chunking
Applied to files:
src/brainlayer/kg_repo.pyCLAUDE.md
📚 Learning: 2026-02-28T09:42:45.691Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-28T09:42:45.691Z
Learning: Applies to src/brainlayer/**/*enrich*.py : Enrichment should add metadata: summary, tags, importance, and intent; capture decisions and corrections in session enrichment
Applied to files:
src/brainlayer/pipeline/enrichment.pysrc/brainlayer/pipeline/entity_extraction.pyCLAUDE.md
📚 Learning: 2026-02-28T09:42:45.691Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-28T09:42:45.691Z
Learning: Applies to src/brainlayer/**/*enrich*.py : Use either Ollama (`glm4`) or MLX as enrichment backends, configurable via `BRAINLAYER_ENRICH_BACKEND` environment variable
Applied to files:
src/brainlayer/pipeline/enrichment.pyCLAUDE.md
📚 Learning: 2026-02-28T09:42:45.691Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-28T09:42:45.691Z
Learning: Follow the Extract -> Classify -> Chunk -> Embed -> Index pipeline workflow
Applied to files:
CLAUDE.md
📚 Learning: 2026-02-28T09:42:45.691Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-28T09:42:45.691Z
Learning: Applies to src/brainlayer/**/*chunk*.py : Skip noise chunks; summarize build_log chunks; extract structure only from dir_listing chunks
Applied to files:
CLAUDE.md
📚 Learning: 2026-02-28T09:42:45.691Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-28T09:42:45.691Z
Learning: Implement post-processing including enrichment, brain graph construction, and Obsidian export
Applied to files:
CLAUDE.md
📚 Learning: 2026-02-28T09:42:45.691Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-28T09:42:45.691Z
Learning: Applies to src/brainlayer/**/*chunk*.py : Use AST-aware chunking with tree-sitter; never split stack traces; mask large tool output
Applied to files:
CLAUDE.md
📚 Learning: 2026-02-28T09:42:45.691Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-28T09:42:45.691Z
Learning: Applies to src/brainlayer/daemon.py : Implement FastAPI daemon in `daemon.py` with core endpoints: `/health`, `/stats`, `/search`, `/context/{chunk_id}`, `/session/{session_id}`
Applied to files:
CLAUDE.md
📚 Learning: 2026-02-28T09:42:45.691Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-28T09:42:45.691Z
Learning: Applies to src/brainlayer/**/*enrich*.py : Use enrichment lock file at `/tmp/brainlayer-enrichment.lock` to prevent concurrent enrichment
Applied to files:
CLAUDE.md
📚 Learning: 2026-02-28T09:42:45.691Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-28T09:42:45.691Z
Learning: Applies to src/brainlayer/**/*.py : Handle concurrency by retrying on `SQLITE_BUSY` errors; each worker should use its own database connection
Applied to files:
CLAUDE.mdsrc/brainlayer/vector_store.py
📚 Learning: 2026-02-28T09:42:45.691Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-28T09:42:45.691Z
Learning: Applies to src/brainlayer/vector_store.py : Use APSW with sqlite-vec for vector storage implementation in `vector_store.py`
Applied to files:
src/brainlayer/vector_store.py
📚 Learning: 2026-02-28T09:42:45.691Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-02-28T09:42:45.691Z
Learning: Applies to src/brainlayer/vector_store.py : Store database at `~/.local/share/brainlayer/brainlayer.db` with WAL mode and `busy_timeout=5000`
Applied to files:
src/brainlayer/vector_store.py
🧬 Code graph analysis (3)
tests/test_kg_rebuild.py (6)
src/brainlayer/pipeline/entity_extraction.py (2)
extract_entities_from_tags(435-481)parse_llm_ner_response(140-205)src/brainlayer/vector_store.py (3)
VectorStore(56-679)close(664-673)_read_cursor(521-523)src/brainlayer/pipeline/kg_extraction_groq.py (2)
build_multi_chunk_ner_prompt(35-54)parse_multi_chunk_response(57-82)src/brainlayer/pipeline/kg_extraction.py (1)
extract_kg_from_chunk(126-174)src/brainlayer/cli/__init__.py (1)
stats(269-273)src/brainlayer/kg_repo.py (2)
upsert_entity(13-86)link_entity_chunk(144-168)
src/brainlayer/pipeline/enrichment.py (2)
src/brainlayer/pipeline/entity_extraction.py (2)
extract_entities_from_tags(435-481)ExtractionResult(45-51)src/brainlayer/pipeline/kg_extraction.py (2)
extract_kg_from_chunk(126-174)process_extraction_result(34-123)
scripts/kg_dedup.py (5)
src/brainlayer/paths.py (1)
get_db_path(23-39)src/brainlayer/pipeline/entity_resolution.py (1)
merge_entities(139-195)src/brainlayer/vector_store.py (2)
VectorStore(56-679)close(664-673)tests/test_kg_rebuild.py (1)
store(163-167)src/brainlayer/kg_repo.py (3)
get_entity_by_name(201-230)get_entity_by_alias(474-499)add_entity_alias(461-472)
🪛 markdownlint-cli2 (0.21.0)
CLAUDE.md
[warning] 68-68: Headings should be surrounded by blank lines
Expected: 1; Actual: 0; Below
(MD022, blanks-around-headings)
🔇 Additional comments (6)
src/brainlayer/kg_repo.py (1)
162-165: Good conflict policy formention_typepreservation.This prevents inferred updates from downgrading existing explicit links, which is the right behavior for provenance fidelity.
src/brainlayer/vector_store.py (1)
503-523: Per-thread readonly connection setup looks correct.Using
threading.local()plus a readonly connection per worker is a solid fix for the busy-in-another-thread failure mode.src/brainlayer/pipeline/enrichment.py (1)
841-871: Tier-1 KG hook integration looks good.Seed extraction plus tag extraction is integrated cleanly and keeps failures non-fatal to enrichment throughput.
tests/test_kg_rebuild.py (1)
21-297: Great coverage for the KG rebuild path.These tests exercise the core regressions and integration points introduced by this PR (tag extraction, Groq parsing, mention-type preservation, and seed-based extraction).
src/brainlayer/pipeline/kg_extraction_groq.py (1)
149-158: 429 handling/backoff logic is well implemented.Good use of
retry-afterwith jitter plus bounded exponential fallback.scripts/kg_rebuild.py (1)
107-146: Tier 1 dedup + confidence selection is clean and effective.Merging seed/tag entities and retaining the highest-confidence
(text, entity_type)before KG ingestion is a strong implementation.
| ## Bulk DB Operations (SAFETY) | ||
| 1. **Stop enrichment workers first** — never run bulk ops while enrichment is writing (causes WAL bloat + potential freeze) |
There was a problem hiding this comment.
Add a blank line after the heading.
## Bulk DB Operations (SAFETY) should be followed by an empty line to satisfy MD022.
🧰 Tools
🪛 markdownlint-cli2 (0.21.0)
[warning] 68-68: Headings should be surrounded by blank lines
Expected: 1; Actual: 0; Below
(MD022, blanks-around-headings)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@CLAUDE.md` around lines 68 - 69, Add a blank line immediately after the
Markdown heading "## Bulk DB Operations (SAFETY)" so the heading is followed by
an empty line (to satisfy MD022); update the CLAUDE.md file by inserting a
single newline line between that heading and the following list item "1. **Stop
enrichment workers first** — never run bulk ops while enrichment is writing
(causes WAL bloat + potential freeze)".
| def load_progress() -> dict: | ||
| if PROGRESS_FILE.exists(): | ||
| return json.loads(PROGRESS_FILE.read_text()) | ||
| return {"tier1_done": False, "tier2_last_offset": 0, "tier2_processed": 0} |
There was a problem hiding this comment.
Harden progress persistence against partial writes.
save_progress() writes directly to the final file and load_progress() assumes valid JSON. A truncated write can break --resume entirely. Use atomic replace and JSON-fallback handling.
Proposed fix
def load_progress() -> dict:
- if PROGRESS_FILE.exists():
- return json.loads(PROGRESS_FILE.read_text())
- return {"tier1_done": False, "tier2_last_offset": 0, "tier2_processed": 0}
+ default = {"tier1_done": False, "tier2_last_offset": 0, "tier2_processed": 0}
+ if not PROGRESS_FILE.exists():
+ return default
+ try:
+ return json.loads(PROGRESS_FILE.read_text())
+ except (json.JSONDecodeError, OSError):
+ logger.warning("Invalid progress file at %s; restarting Tier 2 progress.", PROGRESS_FILE)
+ return default
def save_progress(progress: dict):
- PROGRESS_FILE.write_text(json.dumps(progress, indent=2))
+ tmp = PROGRESS_FILE.with_suffix(PROGRESS_FILE.suffix + ".tmp")
+ tmp.write_text(json.dumps(progress, indent=2))
+ tmp.replace(PROGRESS_FILE)Also applies to: 70-71
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/kg_rebuild.py` around lines 64 - 67, load_progress currently assumes
PROGRESS_FILE contains valid JSON and save_progress writes directly to the final
path, so implement atomic persistence: modify save_progress to write JSON to a
temporary file (e.g., PROGRESS_FILE.with_suffix(".tmp") or similar) and then
atomically replace the final file (os.replace) to avoid truncated files; update
load_progress to catch JSONDecodeError (and FileNotFoundError) and fall back to
the default dict {"tier1_done": False, "tier2_last_offset": 0,
"tier2_processed": 0} so a partial/corrupted file won't break --resume; apply
these changes to the functions named save_progress and load_progress and use the
PROGRESS_FILE symbol to locate the file handling.
| query = """ | ||
| SELECT c.id, c.content | ||
| FROM chunks c | ||
| LEFT JOIN kg_entity_chunks ec ON c.id = ec.chunk_id | ||
| WHERE c.summary IS NOT NULL AND c.summary != '' | ||
| AND c.importance >= 6 | ||
| AND ec.chunk_id IS NULL | ||
| AND c.content IS NOT NULL | ||
| AND LENGTH(c.content) > 50 | ||
| ORDER BY c.importance DESC, c.id | ||
| LIMIT ? OFFSET ? | ||
| """ |
There was a problem hiding this comment.
OFFSET pagination will skip chunks as links are created.
Because the query excludes already-linked chunks (ec.chunk_id IS NULL), each processed batch mutates the result set. Incrementing OFFSET then skips remaining candidates.
Proposed fix (stable keyset pagination)
- progress = load_progress() if resume else {"tier2_last_offset": 0, "tier2_processed": 0}
- start_offset = progress.get("tier2_last_offset", 0)
+ progress = load_progress() if resume else {"tier2_last_id": 0, "tier2_processed": 0}
+ last_id = progress.get("tier2_last_id", 0)
@@
- query = """
+ query = """
SELECT c.id, c.content
FROM chunks c
LEFT JOIN kg_entity_chunks ec ON c.id = ec.chunk_id
WHERE c.summary IS NOT NULL AND c.summary != ''
AND c.importance >= 6
AND ec.chunk_id IS NULL
AND c.content IS NOT NULL
AND LENGTH(c.content) > 50
- ORDER BY c.importance DESC, c.id
- LIMIT ? OFFSET ?
+ AND c.id > ?
+ ORDER BY c.id
+ LIMIT ?
"""
@@
- offset = start_offset
while stats["chunks_processed"] < limit:
- rows = list(cursor.execute(query, (chunks_per_call, offset)))
+ rows = list(cursor.execute(query, (last_id, chunks_per_call)))
if not rows:
logger.info("No more unprocessed chunks")
break
@@
- offset += chunks_per_call
+ last_id = rows[-1][0]
@@
- progress["tier2_last_offset"] = offset
+ progress["tier2_last_id"] = last_id
progress["tier2_processed"] = stats["chunks_processed"]
save_progress(progress)Also applies to: 206-209, 284-289
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/kg_rebuild.py` around lines 185 - 196, The query uses OFFSET
pagination which will skip rows as links are created because the WHERE clause
filters out linked chunks; replace OFFSET-based paging with stable keyset
pagination: change the SQL in the query variable to accept a last-seen cursor
(e.g., last_importance and last_id) and add a WHERE clause like "AND
(c.importance < :last_importance OR (c.importance = :last_importance AND c.id >
:last_id))" (matching the existing ORDER BY c.importance DESC, c.id) and remove
OFFSET; update the loop that calls this query to pass and update the cursor
values (track the last row's importance and id after each page) and repeat until
fewer than LIMIT rows are returned; apply the same change pattern to the other
queries noted around the chunks of code at the locations you mentioned (the
queries used at lines 206-209 and 284-289) so all pagination switches from
LIMIT/OFFSET to keyset using last_importance and last_id.
| for chunk_result in parsed_results: | ||
| chunk_id = chunk_result["chunk_id"] | ||
| # Find the original content for span matching | ||
| content = "" | ||
| for c in chunks: | ||
| if c["id"] == chunk_id: | ||
| content = c["content"] | ||
| break | ||
|
|
||
| entities = [] | ||
| for ent_data in chunk_result.get("entities", []): | ||
| text = ent_data.get("text", "") | ||
| etype = ent_data.get("type", "") | ||
| if not text or not etype: | ||
| continue | ||
| # Find span in content | ||
| idx = content.lower().find(text.lower()) if content else -1 | ||
| entities.append(ExtractedEntity( | ||
| text=text, | ||
| entity_type=etype, | ||
| start=idx, | ||
| end=idx + len(text) if idx >= 0 else -1, | ||
| confidence=0.75, | ||
| source="llm", | ||
| )) | ||
|
|
||
| relations = [] | ||
| for rel_data in chunk_result.get("relations", []): | ||
| source = rel_data.get("source", "") | ||
| target = rel_data.get("target", "") | ||
| rtype = rel_data.get("type", "") | ||
| if source and target and rtype: | ||
| relations.append(ExtractedRelation( | ||
| source_text=source, | ||
| target_text=target, | ||
| relation_type=rtype, | ||
| confidence=0.70, | ||
| )) | ||
|
|
||
| if entities or relations: | ||
| result = ExtractionResult( | ||
| entities=entities, | ||
| relations=relations, | ||
| chunk_id=chunk_id, | ||
| ) | ||
| kg_stats = process_extraction_result(store, result) | ||
| stats["entities_found"] += kg_stats["entities_created"] | ||
| stats["relations_found"] += kg_stats["relations_created"] | ||
|
|
There was a problem hiding this comment.
Isolate failures per chunk result to avoid dropping whole batches.
A single bad chunk_result currently aborts the entire parsed batch. Catch exceptions inside the loop so other chunk results in the same Groq response still get ingested.
Proposed fix
- for chunk_result in parsed_results:
- chunk_id = chunk_result["chunk_id"]
- # Find the original content for span matching
- content = ""
- for c in chunks:
- if c["id"] == chunk_id:
- content = c["content"]
- break
-
- entities = []
- for ent_data in chunk_result.get("entities", []):
- text = ent_data.get("text", "")
- etype = ent_data.get("type", "")
- if not text or not etype:
- continue
- # Find span in content
- idx = content.lower().find(text.lower()) if content else -1
- entities.append(ExtractedEntity(
- text=text,
- entity_type=etype,
- start=idx,
- end=idx + len(text) if idx >= 0 else -1,
- confidence=0.75,
- source="llm",
- ))
-
- relations = []
- for rel_data in chunk_result.get("relations", []):
- source = rel_data.get("source", "")
- target = rel_data.get("target", "")
- rtype = rel_data.get("type", "")
- if source and target and rtype:
- relations.append(ExtractedRelation(
- source_text=source,
- target_text=target,
- relation_type=rtype,
- confidence=0.70,
- ))
-
- if entities or relations:
- result = ExtractionResult(
- entities=entities,
- relations=relations,
- chunk_id=chunk_id,
- )
- kg_stats = process_extraction_result(store, result)
- stats["entities_found"] += kg_stats["entities_created"]
- stats["relations_found"] += kg_stats["relations_created"]
+ for chunk_result in parsed_results:
+ try:
+ chunk_id = chunk_result["chunk_id"]
+ content = ""
+ for c in chunks:
+ if c["id"] == chunk_id:
+ content = c["content"]
+ break
+
+ entities = []
+ for ent_data in chunk_result.get("entities", []):
+ text = ent_data.get("text", "")
+ etype = ent_data.get("type", "")
+ if not text or not etype:
+ continue
+ idx = content.lower().find(text.lower()) if content else -1
+ entities.append(ExtractedEntity(
+ text=text,
+ entity_type=etype,
+ start=idx,
+ end=idx + len(text) if idx >= 0 else -1,
+ confidence=0.75,
+ source="llm",
+ ))
+
+ relations = []
+ for rel_data in chunk_result.get("relations", []):
+ source = rel_data.get("source", "")
+ target = rel_data.get("target", "")
+ rtype = rel_data.get("type", "")
+ if source and target and rtype:
+ relations.append(ExtractedRelation(
+ source_text=source,
+ target_text=target,
+ relation_type=rtype,
+ confidence=0.70,
+ ))
+
+ if entities or relations:
+ result = ExtractionResult(
+ entities=entities,
+ relations=relations,
+ chunk_id=chunk_id,
+ )
+ kg_stats = process_extraction_result(store, result)
+ stats["entities_found"] += kg_stats["entities_created"]
+ stats["relations_found"] += kg_stats["relations_created"]
+ except Exception:
+ logger.exception("Error processing parsed Groq result: %s", chunk_result)
+ stats["errors"] += 1Also applies to: 280-283
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/kg_rebuild.py` around lines 229 - 277, Wrap the processing of each
chunk_result in a try/except so one malformed chunk_result cannot abort the
entire parsed_results loop: inside the for chunk_result in parsed_results loop
(the block that builds entities, relations, creates ExtractionResult and calls
process_extraction_result), catch any Exception, log the error with context
(chunk_id and the chunk_result payload) and continue to the next chunk_result;
apply the same per-item try/except pattern to the analogous block that handles
results at the later location referenced (the code that appends to
stats["entities_found"] and stats["relations_found"]) so each chunk is isolated
from failures.
| db_path = get_db_path() | ||
| logger.info("Using DB: %s", db_path) | ||
| store = VectorStore(db_path) | ||
|
|
||
| if args.stats: | ||
| print_kg_stats(store) | ||
| store.close() | ||
| return | ||
|
|
||
| if not args.tier1 and not args.tier2: | ||
| parser.print_help() | ||
| print("\nSpecify --tier1, --tier2, or both.") | ||
| store.close() | ||
| return | ||
|
|
||
| print_kg_stats(store) | ||
|
|
||
| if args.tier1: | ||
| tier1_stats = tier1_seed_and_tags(store) | ||
| print_kg_stats(store) | ||
|
|
||
| if args.tier2: | ||
| tier2_stats = tier2_groq_ner( | ||
| store, limit=args.limit, | ||
| chunks_per_call=args.chunks_per_call, | ||
| resume=args.resume, | ||
| ) | ||
| print_kg_stats(store) | ||
|
|
||
| store.close() | ||
| logger.info("Done!") |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Wrap main workflow in try/finally to guarantee connection cleanup.
store.close() is handled for expected paths, but an unexpected exception can bypass it. Use try/finally around the orchestration block.
Proposed refactor
db_path = get_db_path()
logger.info("Using DB: %s", db_path)
store = VectorStore(db_path)
-
- if args.stats:
- print_kg_stats(store)
- store.close()
- return
-
- if not args.tier1 and not args.tier2:
- parser.print_help()
- print("\nSpecify --tier1, --tier2, or both.")
- store.close()
- return
-
- print_kg_stats(store)
-
- if args.tier1:
- tier1_stats = tier1_seed_and_tags(store)
- print_kg_stats(store)
-
- if args.tier2:
- tier2_stats = tier2_groq_ner(
- store, limit=args.limit,
- chunks_per_call=args.chunks_per_call,
- resume=args.resume,
- )
- print_kg_stats(store)
-
- store.close()
- logger.info("Done!")
+ try:
+ if args.stats:
+ print_kg_stats(store)
+ return
+
+ if not args.tier1 and not args.tier2:
+ parser.print_help()
+ print("\nSpecify --tier1, --tier2, or both.")
+ return
+
+ print_kg_stats(store)
+
+ if args.tier1:
+ tier1_stats = tier1_seed_and_tags(store)
+ print_kg_stats(store)
+
+ if args.tier2:
+ tier2_stats = tier2_groq_ner(
+ store, limit=args.limit,
+ chunks_per_call=args.chunks_per_call,
+ resume=args.resume,
+ )
+ print_kg_stats(store)
+
+ logger.info("Done!")
+ finally:
+ store.close()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/kg_rebuild.py` around lines 326 - 356, Wrap the orchestration from
after store creation in a try/finally so store.close() always runs: create
db_path and store with get_db_path() and VectorStore(...) as before, then put
the logic that checks args.stats, args.tier1/2, calls print_kg_stats,
tier1_seed_and_tags, tier2_groq_ner, and logger.info("Done!") inside a try block
and move the single store.close() into the finally block; remove the multiple
early store.close() calls so the finally always handles cleanup even if an
exception occurs.
| # Thread-local storage for per-thread read connections. | ||
| # APSW connections are NOT thread-safe — each thread needs its own. | ||
| # This prevents "Connection is busy in another thread" when parallel | ||
| # MCP tool calls (e.g., brain_search) hit the same VectorStore. | ||
| self._local = threading.local() | ||
|
|
||
| def _get_read_conn(self) -> apsw.Connection: | ||
| """Get or create a per-thread readonly connection.""" | ||
| conn = getattr(self._local, "read_conn", None) | ||
| if conn is None: | ||
| conn = apsw.Connection(str(self.db_path), flags=apsw.SQLITE_OPEN_READONLY) | ||
| conn.enableloadextension(True) | ||
| conn.loadextension(sqlite_vec.loadable_path()) | ||
| conn.enableloadextension(False) | ||
| conn.cursor().execute("PRAGMA busy_timeout = 5000") | ||
| self._local.read_conn = conn | ||
| return conn |
There was a problem hiding this comment.
🧩 Analysis chain
🏁 Script executed:
# First, let's explore the repository structure and locate the file
find . -name "vector_store.py" -type f | head -5Repository: EtanHey/brainlayer
Length of output: 94
🏁 Script executed:
# Get the size of vector_store.py to understand its scope
wc -l src/brainlayer/vector_store.pyRepository: EtanHey/brainlayer
Length of output: 96
🏁 Script executed:
# Examine the specific lines 503-519 and surrounding context
sed -n '495,525p' src/brainlayer/vector_store.pyRepository: EtanHey/brainlayer
Length of output: 1520
🏁 Script executed:
# Look for the close() method and lines 666-671
sed -n '660,680p' src/brainlayer/vector_store.pyRepository: EtanHey/brainlayer
Length of output: 677
🏁 Script executed:
# Search for all references to _get_read_conn and close methods
rg -n "def _get_read_conn|def close\(" src/brainlayer/vector_store.py -A 20Repository: EtanHey/brainlayer
Length of output: 1693
🏁 Script executed:
# Check if there are any existing registries or tracking of connections
rg -n "_read_conns|WeakSet|connections.*dict|connection.*list" src/brainlayer/vector_store.pyRepository: EtanHey/brainlayer
Length of output: 44
close() does not close read connections created in other threads.
Current cleanup only touches the caller thread's self._local.read_conn. If this VectorStore instance is read from multiple threads, other per-thread connections can remain open until thread teardown/GC.
Suggested fix
class VectorStore(SearchMixin, KGMixin, SessionMixin):
@@
- self._local = threading.local()
+ self._local = threading.local()
+ self._read_conns: set[apsw.Connection] = set()
+ self._read_conns_lock = threading.Lock()
@@
def _get_read_conn(self) -> apsw.Connection:
"""Get or create a per-thread readonly connection."""
conn = getattr(self._local, "read_conn", None)
if conn is None:
conn = apsw.Connection(str(self.db_path), flags=apsw.SQLITE_OPEN_READONLY)
@@
conn.setbusytimeout(30_000)
self._local.read_conn = conn
+ with self._read_conns_lock:
+ self._read_conns.add(conn)
return conn
@@
def close(self) -> None:
"""Close database connections."""
- # Close thread-local read connection if it exists
- if hasattr(self, "_local"):
- read_conn = getattr(self._local, "read_conn", None)
- if read_conn is not None:
- read_conn.close()
- self._local.read_conn = None
+ if hasattr(self, "_local"):
+ read_conn = getattr(self._local, "read_conn", None)
+ if read_conn is not None:
+ read_conn.close()
+ self._local.read_conn = None
+ if hasattr(self, "_read_conns_lock") and hasattr(self, "_read_conns"):
+ with self._read_conns_lock:
+ for c in list(self._read_conns):
+ try:
+ c.close()
+ except Exception:
+ pass
+ self._read_conns.clear()
if hasattr(self, "conn"):
self.conn.close()🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/vector_store.py` around lines 503 - 519, The close() path
currently only closes the current thread's self._local.read_conn and misses
per-thread read connections created by _get_read_conn; to fix, add a thread-safe
registry (e.g., self._read_conns) that _get_read_conn appends each new
apsw.Connection to (use weakref.WeakSet or store weakrefs and protect with a
lock) and then update close() to iterate this registry and close every
connection found, clearing the registry and removing any references; ensure you
still set/clear self._local.read_conn as before and guard registry mutations
with a threading.Lock to avoid races.
- Clean up kg_vec_entities when deleting false positive entities (orphan rows) - Skip non-dict and empty chunk_id entries in parse_multi_chunk_response - Deduplicate normalized tags before emitting entities (avoid redundant upserts) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…ity data Add KG overlay to brain-export graph.json: - 109 entity nodes (person, project, technology, company, golem, topic) positioned at centroids of their connected sessions - 436 entity→session edges based on kg_entity_chunks relevance - 16 entity↔entity relation edges from kg_relations - 3895/4378 sessions enriched with top_entities list - New node fields: node_type, entity_type, top_entities, description - New edge fields: edge_type, relation_type, fact - Full backward compatibility — existing dashboard fields unchanged Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Summary
use_llm=False, use_gliner=False, seed_entities={}(triple disabled)Changes
Core fixes
DEFAULT_SEED_ENTITIES+ tag-based extraction in the enrichment hookexplicitmention_type — don't letinferredoverwrite it on conflictthreading.local(), clear reference onclose()extract_entities_from_tags(), skip non-string tags, normalize dotsNew modules
Results
Test plan
test_kg_rebuild.pycovering tag extraction, Groq NER parsing, enrichment hook, mention_type preservation, close() cleanup🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Documentation
Tests
Performance