feat: KG entity quality — validation, prompts, cleanup#69
Conversation
… script (#A7) Add post-extraction validation that fixes the most common KG data quality issues: entity type coercion (agent patterns, known projects/tech), relation direction validation with auto-swap, self-referential relation filtering, canonical relation type normalization, and entity importance computation from chunk links. - Validation layer in kg_extraction.py (validate_extraction_result + compute_entity_importance) - Improved Groq NER prompt with entity type guidance, direction rules, fact field - Improved base NER prompt with agent type and direction constraints - Expanded seed entities (golem → agent type, 13 agents, new people/projects) - Data cleanup script (scripts/kg_cleanup.py) for fixing existing misclassifications - 22 new tests in test_kg_quality.py (entity coercion, direction swap, normalization, fact propagation, importance) Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Your free trial has ended. If you'd like to continue receiving code reviews, you can add a payment method here.
📝 WalkthroughWalkthroughAdds a KG cleanup CLI script, extends extraction/NER prompts and seed entities, introduces relation canonicalization, direction validation, fact propagation, and entity importance computation, and adds comprehensive tests for KG quality improvements. Changes
Sequence DiagramsequenceDiagram
participant Extractor as Extraction<br/>(NER/GROQ)
participant Validator as validate_extraction_<br/>result()
participant Store as VectorStore<br/>(KG DB)
participant Importance as compute_entity_<br/>importance()
participant CLI as kg_cleanup.py
Extractor->>Validator: ExtractionResult (entities, relations, facts)
Validator->>Validator: Coerce entity types, normalize relation types
Validator->>Validator: Validate/swap directions, drop self-relations
Validator->>Store: Add/update entities & relations (include fact)
CLI->>Store: Read/prepare (dry-run) or apply updates (with WAL checkpoint)
CLI->>Importance: Trigger recompute (optional)
Importance->>Store: Read chunk links & relations -> update importance
CLI->>CLI: Print stats (entity/relation counts, importance distribution)
Estimated Code Review Effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly Related PRs
Poem
🚥 Pre-merge checks | ✅ 3✅ Passed checks (3 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 6
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@scripts/kg_cleanup.py`:
- Around line 250-277: Add a pre-operation WAL checkpoint before performing the
bulk cleanup writes: before calling fix_entity_types and fix_relations (and
before compute_entity_importance when not dry_run), call
store.conn.cursor().execute("PRAGMA wal_checkpoint(FULL)") and log the action
(similar to the existing post-checkpoint). Only run this pre-checkpoint when not
dry_run, mirroring the existing post-checkpoint behavior, so place it just
before the "Fix entity types" section and ensure you reference the same store
connection and dry_run flag used by fix_entity_types, fix_relations, and
compute_entity_importance.
In `@src/brainlayer/pipeline/batch_extraction.py`:
- Around line 57-63: The seed list contains case-duplicate entries (e.g.,
"contentGolem"/"ContentClaude" vs "ContentClaude") while extract_seed_entities
performs case-insensitive matching; update the agent seed definition used by
extract_seed_entities to deduplicate case-insensitively—either remove the
duplicate entries manually (remove "ContentClaude" or "contentClaude") or
programmatically normalize the seed list (map to lower() and use a set) before
passing it to extract_seed_entities so redundant matches are avoided.
In `@src/brainlayer/pipeline/entity_extraction.py`:
- Around line 133-135: The parser parse_llm_ner_response currently drops
top-level "fact" from LLM output; update parse_llm_ner_response to read and
preserve a top-level "fact" key from the JSON response and include it in the
returned data structure (e.g., attach to the overall return dict and propagate
into relation entries that do not have their own "fact"); ensure JSON parsing
handles optional top-level "fact" and that any relations missing their own
"fact" get assigned the preserved top-level fact so downstream code (relation
handling) receives the fact text.
In `@src/brainlayer/pipeline/kg_extraction.py`:
- Line 49: The "works_at" validation rule currently allows person/agent ->
project which contradicts the prompt contract; update the tuple for "works_at"
in the KG validation mapping in kg_extraction.py: change the allowed
source/target sets to reflect person -> company only (i.e., set the first
element to {"person"} and the second element to {"company"}) so "works_at" edges
validate strictly as person → company.
In `@tests/test_kg_quality.py`:
- Around line 287-337: The test reveals parse_llm_ner_response may drop a
top-level "fact" key on relations so downstream code only reads
properties["fact"]; update parse_llm_ner_response (the function that builds
ExtractedRelation objects) to propagate any top-level "fact" into the
ExtractedRelation instance (either set rel.fact attribute or copy into
rel.properties["fact"]) and ensure process_extraction_result (which inserts into
kg_relations) reads the relation.fact attribute first and falls back to
relation.properties.get("fact") when populating the DB fact column; modify the
creation/serialization paths for ExtractedRelation so the top-level fact is not
lost.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (7)
scripts/kg_cleanup.pysrc/brainlayer/pipeline/batch_extraction.pysrc/brainlayer/pipeline/entity_extraction.pysrc/brainlayer/pipeline/kg_extraction.pysrc/brainlayer/pipeline/kg_extraction_groq.pytests/test_batch_extraction.pytests/test_kg_quality.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: test (3.13)
- GitHub Check: test (3.12)
- GitHub Check: test (3.11)
🧰 Additional context used
📓 Path-based instructions (4)
src/brainlayer/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Use Python package structure with Typer CLI located in
src/brainlayer/
Files:
src/brainlayer/pipeline/batch_extraction.pysrc/brainlayer/pipeline/kg_extraction.pysrc/brainlayer/pipeline/entity_extraction.pysrc/brainlayer/pipeline/kg_extraction_groq.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Lint and format Python code using
ruff check src/andruff format src/
Files:
src/brainlayer/pipeline/batch_extraction.pysrc/brainlayer/pipeline/kg_extraction.pysrc/brainlayer/pipeline/entity_extraction.pysrc/brainlayer/pipeline/kg_extraction_groq.py
**/*test*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Run tests with
pytest
Files:
tests/test_batch_extraction.pytests/test_kg_quality.py
scripts/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
scripts/**/*.py: Fix 8 scripts inscripts/that hardcodebrainlayer.dbpath to use dynamic path resolution during DB consolidation
Checkpoint WAL withPRAGMA wal_checkpoint(FULL)before and after bulk database operations
Drop FTS triggers (especiallychunks_fts_delete) before bulk deletes and recreate after — FTS triggers cause massive performance degradation on large datasets
Batch delete operations in 5-10K chunks with checkpoint every 3 batches
Never delete fromchunkstable while FTS trigger is active on large datasets
Files:
scripts/kg_cleanup.py
🧬 Code graph analysis (3)
src/brainlayer/pipeline/kg_extraction.py (2)
src/brainlayer/pipeline/entity_extraction.py (3)
ExtractedRelation(34-41)ExtractionResult(45-51)extract_entities_combined(495-545)src/brainlayer/kg_repo.py (1)
add_relation(88-142)
scripts/kg_cleanup.py (3)
src/brainlayer/paths.py (1)
get_db_path(23-39)src/brainlayer/pipeline/kg_extraction.py (1)
compute_entity_importance(129-191)src/brainlayer/vector_store.py (2)
VectorStore(56-704)_read_cursor(546-548)
tests/test_kg_quality.py (2)
src/brainlayer/pipeline/kg_extraction.py (3)
validate_extraction_result(57-126)process_extraction_result(205-300)compute_entity_importance(129-191)src/brainlayer/vector_store.py (2)
VectorStore(56-704)_read_cursor(546-548)
🔇 Additional comments (2)
src/brainlayer/pipeline/kg_extraction_groq.py (1)
18-49: Prompt update looks solid and aligned with KG direction/fact requirements.The revised guidance is explicit on entity typing, relation direction, and mandatory
fact, which should improve extraction consistency.tests/test_batch_extraction.py (1)
80-83: Seed taxonomy test update is correct (agentkey).This assertion now matches the migrated seed structure and guards the intended public key.
| # 2. Delete self-referential relations | ||
| self_refs = list( | ||
| cursor.execute("SELECT id, source_id FROM kg_relations WHERE source_id = target_id AND expired_at IS NULL") | ||
| ) | ||
| for rel_id, _ in self_refs: | ||
| if dry_run: | ||
| logger.info("[DRY-RUN] Would expire self-referential relation %s", rel_id) | ||
| else: | ||
| write_cursor = store.conn.cursor() | ||
| write_cursor.execute( | ||
| "UPDATE kg_relations SET expired_at = datetime('now') WHERE id = ?", | ||
| (rel_id,), | ||
| ) | ||
| logger.info("Expired self-referential relation %s", rel_id) | ||
| stats["deleted_self_ref"] += 1 | ||
|
|
||
| # 3. Normalize non-canonical relation types to related_to | ||
| non_canonical = list( | ||
| cursor.execute( | ||
| "SELECT id, relation_type FROM kg_relations WHERE expired_at IS NULL AND relation_type NOT IN ({})".format( | ||
| ",".join(f"'{t}'" for t in CANONICAL_RELATION_TYPES) | ||
| ) | ||
| ) | ||
| ) | ||
| for rel_id, old_type in non_canonical: | ||
| if dry_run: | ||
| logger.info("[DRY-RUN] Would normalize relation type %s → related_to", old_type) | ||
| else: | ||
| write_cursor = store.conn.cursor() | ||
| write_cursor.execute( | ||
| "UPDATE kg_relations SET relation_type = 'related_to' WHERE id = ?", | ||
| (rel_id,), | ||
| ) | ||
| logger.info("Normalized relation type %s → related_to", old_type) | ||
| stats["normalized"] += 1 | ||
|
|
There was a problem hiding this comment.
Batch relation cleanup updates instead of full-materialization + per-row writes.
self_refs and non_canonical are fully loaded into memory and then updated one row at a time. On large KG datasets this can hold long write locks and degrade throughput.
⚙️ Suggested batching pattern
+def _update_relations_in_batches(store: VectorStore, rel_ids: list[str], *, batch_size: int = 5000) -> int:
+ updated = 0
+ cursor = store.conn.cursor()
+ for i in range(0, len(rel_ids), batch_size):
+ batch = rel_ids[i : i + batch_size]
+ cursor.executemany(
+ "UPDATE kg_relations SET expired_at = datetime('now') WHERE id = ?",
+ [(rid,) for rid in batch],
+ )
+ updated += len(batch)
+ if ((i // batch_size) + 1) % 3 == 0:
+ cursor.execute("PRAGMA wal_checkpoint(FULL)")
+ return updatedAs per coding guidelines, "Batch delete operations in 5-10K chunks with checkpoint every 3 batches".
| # Relation direction constraints: relation_type → (valid_source_types, valid_target_types) | ||
| # If extracted direction is wrong, we swap source/target. | ||
| _RELATION_DIRECTION_RULES: dict[str, tuple[set[str], set[str]]] = { | ||
| "works_at": ({"person", "agent"}, {"company", "project"}), |
There was a problem hiding this comment.
Align works_at validation to company-only targets.
Current rule accepts person/agent -> project for works_at, which conflicts with the prompt contract (person -> company) and lets semantically invalid works_at edges pass validation.
✅ Tighten the rule
- "works_at": ({"person", "agent"}, {"company", "project"}),
+ "works_at": ({"person", "agent"}, {"company"}),📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| "works_at": ({"person", "agent"}, {"company", "project"}), | |
| "works_at": ({"person", "agent"}, {"company"}), |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/pipeline/kg_extraction.py` at line 49, The "works_at"
validation rule currently allows person/agent -> project which contradicts the
prompt contract; update the tuple for "works_at" in the KG validation mapping in
kg_extraction.py: change the allowed source/target sets to reflect person ->
company only (i.e., set the first element to {"person"} and the second element
to {"company"}) so "works_at" edges validate strictly as person → company.
| def test_extracted_relation_has_fact_field(self): | ||
| """ExtractedRelation should support a fact attribute.""" | ||
| rel = ExtractedRelation( | ||
| source_text="Yuval Nir", | ||
| target_text="Etan Heyman", | ||
| relation_type="client_of", | ||
| confidence=0.8, | ||
| properties={"fact": "Yuval Nir is a client of Etan Heyman"}, | ||
| ) | ||
| # fact should be accessible (either as attribute or via properties) | ||
| fact = getattr(rel, "fact", None) or rel.properties.get("fact") | ||
| assert fact is not None | ||
| assert "client" in fact.lower() | ||
|
|
||
| def test_fact_stored_in_kg_relations(self, tmp_path): | ||
| """When a relation has a fact, it should be stored in kg_relations.fact column.""" | ||
| from brainlayer.pipeline.kg_extraction import process_extraction_result | ||
| from brainlayer.vector_store import VectorStore | ||
|
|
||
| store = VectorStore(tmp_path / "test.db") | ||
|
|
||
| # Pre-create entities so resolution finds them | ||
| store.upsert_entity("person-yuval", "person", "Yuval Nir") | ||
| store.upsert_entity("person-etan", "person", "Etan Heyman") | ||
|
|
||
| result = ExtractionResult( | ||
| entities=[ | ||
| ExtractedEntity(text="Yuval Nir", entity_type="person", start=0, end=9, confidence=0.9, source="seed"), | ||
| ExtractedEntity( | ||
| text="Etan Heyman", entity_type="person", start=15, end=26, confidence=0.9, source="seed" | ||
| ), | ||
| ], | ||
| relations=[ | ||
| ExtractedRelation( | ||
| source_text="Yuval Nir", | ||
| target_text="Etan Heyman", | ||
| relation_type="client_of", | ||
| confidence=0.8, | ||
| properties={"fact": "Yuval Nir is a client of Etan Heyman"}, | ||
| ), | ||
| ], | ||
| chunk_id="test-chunk", | ||
| ) | ||
| process_extraction_result(store, result) | ||
|
|
||
| cursor = store._read_cursor() | ||
| rows = list(cursor.execute("SELECT fact FROM kg_relations WHERE relation_type = 'client_of'")) | ||
| assert len(rows) >= 1 | ||
| assert rows[0][0] is not None | ||
| assert "client" in rows[0][0].lower() | ||
| store.close() |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial
Add one parser-level regression test for top-level relation fact.
Current fact coverage uses properties["fact"] directly, so it won’t catch failures where parse_llm_ner_response drops top-level "fact" from LLM JSON.
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@tests/test_kg_quality.py` around lines 287 - 337, The test reveals
parse_llm_ner_response may drop a top-level "fact" key on relations so
downstream code only reads properties["fact"]; update parse_llm_ner_response
(the function that builds ExtractedRelation objects) to propagate any top-level
"fact" into the ExtractedRelation instance (either set rel.fact attribute or
copy into rel.properties["fact"]) and ensure process_extraction_result (which
inserts into kg_relations) reads the relation.fact attribute first and falls
back to relation.properties.get("fact") when populating the DB fact column;
modify the creation/serialization paths for ExtractedRelation so the top-level
fact is not lost.
…int, cleanup - Fix parse_llm_ner_response to capture top-level "fact" from LLM JSON into properties dict (was silently dropped) - Tighten works_at validation to company-only targets (align with prompt) - Add pre-operation WAL checkpoint in cleanup script - Remove duplicate ContentClaude seed (case-insensitive matching) - Add parser-level fact regression test Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 4
♻️ Duplicate comments (1)
scripts/kg_cleanup.py (1)
178-224:⚠️ Potential issue | 🟠 MajorBatch relation cleanup writes instead of per-row updates.
These loops perform row-by-row
UPDATEs on potentially large sets. This can hold write locks longer and does not follow the repo’s bulk-operation strategy.As per coding guidelines, "Batch delete operations in 5-10K chunks with checkpoint every 3 batches".
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@scripts/kg_cleanup.py` around lines 178 - 224, The loops (iterating rows, self_refs, non_canonical and using write_cursor.execute per row) perform per-row UPDATEs; change them to batched updates by collecting ids into chunks of 5–10k and running a single UPDATE ... WHERE id IN (...) per chunk (use the same SET expired_at = datetime('now') for deletions and SET relation_type = 'related_to' for normalization), use store.conn.execute/execute many inside a transaction, commit after each batch and perform a checkpoint every 3 batches, and update the same stats counters (stats["deleted_known"], stats["deleted_self_ref"], stats["normalized"]) by the number of rows processed per batch; refer to variables/functions cursor, write_cursor, store.conn, rows/self_refs/non_canonical and the SQL updates in the diff when locating the changes.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@scripts/kg_cleanup.py`:
- Around line 141-157: The loop over ENTITY_TYPE_FIXES in fix_entity_types only
updates the first matching row (rows[0]) so duplicates stay misclassified;
modify the logic to update all matching IDs (iterate over rows or run a single
UPDATE that targets entity_type/name) using the same write_cursor from
store.conn, update updated_at for each id (or let the bulk UPDATE set
updated_at), and adjust the fixed counter to reflect the number of rows changed;
preserve dry_run behavior by logging each would-be retype or a summary for that
key.
In `@src/brainlayer/pipeline/entity_extraction.py`:
- Around line 198-201: The code assumes props = raw_rel.get("properties") is a
dict before doing props["fact"] = fact; first verify the shape with
isinstance(props, dict) and only mutate it when true, otherwise create a new
dict (e.g., new_props = {"fact": fact}) and assign it back to
raw_rel["properties"]; ensure you update raw_rel (not just the local props
variable) so downstream code sees the corrected properties, using the local
symbols fact, props and raw_rel.get("properties") to locate the change.
In `@src/brainlayer/pipeline/kg_extraction.py`:
- Around line 107-119: The code currently swaps relation direction using
_RELATION_DIRECTION_RULES but always appends rel via
validated_relations.append(rel) even if the final (source_type, target_type) is
still invalid; modify the logic in the block handling rel.relation_type so that
after a swap you also update or recompute source_type and target_type (or swap
those variables) and then check against valid_src and valid_tgt for that
rel.relation_type, only calling validated_relations.append(rel) when the pair is
valid; drop/skip the relation when it remains invalid after correction and keep
the logging of swaps in the rel.relation_type handling.
- Around line 46-54: The relation "affiliated_with" is missing from
_RELATION_DIRECTION_RULES so reversed/invalid edges are not validated; add a new
entry for "affiliated_with" in the _RELATION_DIRECTION_RULES dict with the
correct (valid_source_types, valid_target_types) tuple matching the prompt
contract (e.g., source should be {"person","agent","company"} and target
{"company","organization"} or whatever canonical types your contract requires),
so the pipeline code that checks direction against _RELATION_DIRECTION_RULES
will detect and swap invalid source/target for affiliated_with.
---
Duplicate comments:
In `@scripts/kg_cleanup.py`:
- Around line 178-224: The loops (iterating rows, self_refs, non_canonical and
using write_cursor.execute per row) perform per-row UPDATEs; change them to
batched updates by collecting ids into chunks of 5–10k and running a single
UPDATE ... WHERE id IN (...) per chunk (use the same SET expired_at =
datetime('now') for deletions and SET relation_type = 'related_to' for
normalization), use store.conn.execute/execute many inside a transaction, commit
after each batch and perform a checkpoint every 3 batches, and update the same
stats counters (stats["deleted_known"], stats["deleted_self_ref"],
stats["normalized"]) by the number of rows processed per batch; refer to
variables/functions cursor, write_cursor, store.conn,
rows/self_refs/non_canonical and the SQL updates in the diff when locating the
changes.
ℹ️ Review info
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro
📒 Files selected for processing (5)
scripts/kg_cleanup.pysrc/brainlayer/pipeline/batch_extraction.pysrc/brainlayer/pipeline/entity_extraction.pysrc/brainlayer/pipeline/kg_extraction.pytests/test_kg_quality.py
📜 Review details
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (3)
- GitHub Check: test (3.11)
- GitHub Check: test (3.12)
- GitHub Check: test (3.13)
🧰 Additional context used
📓 Path-based instructions (4)
src/brainlayer/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Use Python package structure with Typer CLI located in
src/brainlayer/
Files:
src/brainlayer/pipeline/entity_extraction.pysrc/brainlayer/pipeline/batch_extraction.pysrc/brainlayer/pipeline/kg_extraction.py
src/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Lint and format Python code using
ruff check src/andruff format src/
Files:
src/brainlayer/pipeline/entity_extraction.pysrc/brainlayer/pipeline/batch_extraction.pysrc/brainlayer/pipeline/kg_extraction.py
**/*test*.py
📄 CodeRabbit inference engine (CLAUDE.md)
Run tests with
pytest
Files:
tests/test_kg_quality.py
scripts/**/*.py
📄 CodeRabbit inference engine (CLAUDE.md)
scripts/**/*.py: Fix 8 scripts inscripts/that hardcodebrainlayer.dbpath to use dynamic path resolution during DB consolidation
Checkpoint WAL withPRAGMA wal_checkpoint(FULL)before and after bulk database operations
Drop FTS triggers (especiallychunks_fts_delete) before bulk deletes and recreate after — FTS triggers cause massive performance degradation on large datasets
Batch delete operations in 5-10K chunks with checkpoint every 3 batches
Never delete fromchunkstable while FTS trigger is active on large datasets
Files:
scripts/kg_cleanup.py
🧠 Learnings (2)
📚 Learning: 2026-03-02T21:14:04.731Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-02T21:14:04.731Z
Learning: Applies to scripts/**/*.py : Checkpoint WAL with `PRAGMA wal_checkpoint(FULL)` before and after bulk database operations
Applied to files:
scripts/kg_cleanup.py
📚 Learning: 2026-03-02T21:14:04.731Z
Learnt from: CR
Repo: EtanHey/brainlayer PR: 0
File: CLAUDE.md:0-0
Timestamp: 2026-03-02T21:14:04.731Z
Learning: Stop enrichment workers before performing bulk database operations to avoid WAL bloat and potential freezes
Applied to files:
scripts/kg_cleanup.py
🧬 Code graph analysis (2)
scripts/kg_cleanup.py (3)
src/brainlayer/paths.py (1)
get_db_path(23-39)src/brainlayer/pipeline/kg_extraction.py (1)
compute_entity_importance(129-191)src/brainlayer/vector_store.py (2)
VectorStore(56-704)_read_cursor(546-548)
src/brainlayer/pipeline/kg_extraction.py (3)
src/brainlayer/vector_store.py (1)
_read_cursor(546-548)src/brainlayer/pipeline/entity_extraction.py (3)
ExtractedRelation(34-41)ExtractionResult(45-51)extract_entities_combined(500-550)src/brainlayer/kg_repo.py (1)
add_relation(88-142)
🔇 Additional comments (3)
src/brainlayer/pipeline/batch_extraction.py (1)
21-65: Seed expansion andagentcategory migration look good.This update is consistent with the KG quality objective and removes the previous case-duplicate seed risk.
src/brainlayer/pipeline/entity_extraction.py (1)
121-135: Prompt contract update is clear and aligned with relation normalization goals.Entity typing + direction/fact instructions are materially improved.
tests/test_kg_quality.py (1)
17-472: Comprehensive regression coverage for KG-quality guardrails.The suite meaningfully exercises coercion, direction correction, canonicalization, fact persistence, and importance scoring.
| for (old_type, name), new_type in ENTITY_TYPE_FIXES.items(): | ||
| rows = list(cursor.execute("SELECT id FROM kg_entities WHERE entity_type = ? AND name = ?", (old_type, name))) | ||
| if not rows: | ||
| continue | ||
|
|
||
| entity_id = rows[0][0] | ||
| if dry_run: | ||
| logger.info("[DRY-RUN] Would retype %s (%s → %s)", name, old_type, new_type) | ||
| else: | ||
| write_cursor = store.conn.cursor() | ||
| write_cursor.execute( | ||
| "UPDATE kg_entities SET entity_type = ?, updated_at = datetime('now') WHERE id = ?", | ||
| (new_type, entity_id), | ||
| ) | ||
| logger.info("Retyped %s (%s → %s)", name, old_type, new_type) | ||
| fixed += 1 | ||
|
|
There was a problem hiding this comment.
Retype all matched entities, not only the first row.
fix_entity_types fetches all matching IDs but updates only rows[0], so duplicate records remain misclassified.
🔧 Suggested fix
- entity_id = rows[0][0]
+ entity_ids = [row[0] for row in rows]
if dry_run:
- logger.info("[DRY-RUN] Would retype %s (%s → %s)", name, old_type, new_type)
+ logger.info(
+ "[DRY-RUN] Would retype %s (%s → %s) [rows=%d]",
+ name,
+ old_type,
+ new_type,
+ len(entity_ids),
+ )
else:
write_cursor = store.conn.cursor()
- write_cursor.execute(
- "UPDATE kg_entities SET entity_type = ?, updated_at = datetime('now') WHERE id = ?",
- (new_type, entity_id),
+ write_cursor.executemany(
+ "UPDATE kg_entities SET entity_type = ?, updated_at = datetime('now') WHERE id = ?",
+ [(new_type, entity_id) for entity_id in entity_ids],
)
- logger.info("Retyped %s (%s → %s)", name, old_type, new_type)
- fixed += 1
+ logger.info("Retyped %s (%s → %s) [rows=%d]", name, old_type, new_type, len(entity_ids))
+ fixed += len(entity_ids)🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@scripts/kg_cleanup.py` around lines 141 - 157, The loop over
ENTITY_TYPE_FIXES in fix_entity_types only updates the first matching row
(rows[0]) so duplicates stay misclassified; modify the logic to update all
matching IDs (iterate over rows or run a single UPDATE that targets
entity_type/name) using the same write_cursor from store.conn, update updated_at
for each id (or let the bulk UPDATE set updated_at), and adjust the fixed
counter to reflect the number of rows changed; preserve dry_run behavior by
logging each would-be retype or a summary for that key.
| fact = raw_rel.get("fact") | ||
| props = raw_rel.get("properties") or {} | ||
| if fact and "fact" not in props: | ||
| props["fact"] = fact |
There was a problem hiding this comment.
Guard properties shape before injecting fact.
If properties is not a dict, props["fact"] = fact can raise and break extraction for the whole response.
🛡️ Suggested fix
- fact = raw_rel.get("fact")
- props = raw_rel.get("properties") or {}
+ fact = raw_rel.get("fact")
+ raw_props = raw_rel.get("properties")
+ props = raw_props if isinstance(raw_props, dict) else {}
if fact and "fact" not in props:
props["fact"] = fact📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| fact = raw_rel.get("fact") | |
| props = raw_rel.get("properties") or {} | |
| if fact and "fact" not in props: | |
| props["fact"] = fact | |
| fact = raw_rel.get("fact") | |
| raw_props = raw_rel.get("properties") | |
| props = raw_props if isinstance(raw_props, dict) else {} | |
| if fact and "fact" not in props: | |
| props["fact"] = fact |
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/pipeline/entity_extraction.py` around lines 198 - 201, The
code assumes props = raw_rel.get("properties") is a dict before doing
props["fact"] = fact; first verify the shape with isinstance(props, dict) and
only mutate it when true, otherwise create a new dict (e.g., new_props =
{"fact": fact}) and assign it back to raw_rel["properties"]; ensure you update
raw_rel (not just the local props variable) so downstream code sees the
corrected properties, using the local symbols fact, props and
raw_rel.get("properties") to locate the change.
| # Relation direction constraints: relation_type → (valid_source_types, valid_target_types) | ||
| # If extracted direction is wrong, we swap source/target. | ||
| _RELATION_DIRECTION_RULES: dict[str, tuple[set[str], set[str]]] = { | ||
| "works_at": ({"person", "agent"}, {"company"}), | ||
| "owns": ({"person"}, {"company", "project", "agent"}), | ||
| "builds": ({"person", "agent"}, {"project", "tool", "technology"}), | ||
| "uses": ({"person", "agent", "project", "company"}, {"tool", "technology"}), | ||
| "coaches": ({"agent"}, {"person"}), | ||
| } |
There was a problem hiding this comment.
Add direction validation for affiliated_with.
affiliated_with is canonical and directional in the prompt contract, but it has no rule here, so reversed/invalid edges pass unvalidated.
➕ Minimal rule addition
_RELATION_DIRECTION_RULES: dict[str, tuple[set[str], set[str]]] = {
"works_at": ({"person", "agent"}, {"company"}),
"owns": ({"person"}, {"company", "project", "agent"}),
"builds": ({"person", "agent"}, {"project", "tool", "technology"}),
"uses": ({"person", "agent", "project", "company"}, {"tool", "technology"}),
"coaches": ({"agent"}, {"person"}),
+ "affiliated_with": ({"person"}, {"company"}),
}🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/pipeline/kg_extraction.py` around lines 46 - 54, The relation
"affiliated_with" is missing from _RELATION_DIRECTION_RULES so reversed/invalid
edges are not validated; add a new entry for "affiliated_with" in the
_RELATION_DIRECTION_RULES dict with the correct (valid_source_types,
valid_target_types) tuple matching the prompt contract (e.g., source should be
{"person","agent","company"} and target {"company","organization"} or whatever
canonical types your contract requires), so the pipeline code that checks
direction against _RELATION_DIRECTION_RULES will detect and swap invalid
source/target for affiliated_with.
| if rel.relation_type in _RELATION_DIRECTION_RULES: | ||
| valid_src, valid_tgt = _RELATION_DIRECTION_RULES[rel.relation_type] | ||
| if source_type not in valid_src and target_type in valid_src: | ||
| # Wrong direction — swap | ||
| rel.source_text, rel.target_text = rel.target_text, rel.source_text | ||
| logger.debug( | ||
| "Swapped relation direction: %s --%s--> %s", | ||
| rel.source_text, | ||
| rel.relation_type, | ||
| rel.target_text, | ||
| ) | ||
|
|
||
| validated_relations.append(rel) |
There was a problem hiding this comment.
Drop relations that remain invalid after direction correction.
Current logic swaps on one reversal case, but still appends relations even when the final (source_type, target_type) is invalid for that relation type.
✅ Suggested fix
if rel.relation_type in _RELATION_DIRECTION_RULES:
valid_src, valid_tgt = _RELATION_DIRECTION_RULES[rel.relation_type]
if source_type not in valid_src and target_type in valid_src:
# Wrong direction — swap
rel.source_text, rel.target_text = rel.target_text, rel.source_text
+ source_type, target_type = target_type, source_type
logger.debug(
"Swapped relation direction: %s --%s--> %s",
rel.source_text,
rel.relation_type,
rel.target_text,
)
+ if source_type not in valid_src or target_type not in valid_tgt:
+ logger.debug(
+ "Dropping invalid relation for %s: %s(%s) -> %s(%s)",
+ rel.relation_type,
+ rel.source_text,
+ source_type,
+ rel.target_text,
+ target_type,
+ )
+ continue🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.
In `@src/brainlayer/pipeline/kg_extraction.py` around lines 107 - 119, The code
currently swaps relation direction using _RELATION_DIRECTION_RULES but always
appends rel via validated_relations.append(rel) even if the final (source_type,
target_type) is still invalid; modify the logic in the block handling
rel.relation_type so that after a swap you also update or recompute source_type
and target_type (or swap those variables) and then check against valid_src and
valid_tgt for that rel.relation_type, only calling
validated_relations.append(rel) when the pair is valid; drop/skip the relation
when it remains invalid after correction and keep the logging of swaps in the
rel.relation_type handling.
Summary
kg_extraction.py— catches and corrects the most common KG extraction errors:*Claude/*Golem→ agent, known projects → project, known tech → technologycompany works_at person→person works_at company)factfieldgolem→agenttype, 13 agents, new people/projects/companiesscripts/kg_cleanup.py) for fixing existing misclassifications in live DBAddresses KG data quality issues found in dashboard testing: wrong entity types, reversed relations, self-referential loops, ad-hoc relation names, and static importance values.
Test plan
test_kg_quality.py(entity coercion, direction swap, normalization, fact propagation, importance)test_batch_extraction.py(golem → agent)ruff check+ruff formatscripts/kg_cleanup.py --statson live DB to verify current statescripts/kg_cleanup.py --applyto fix existing data🤖 Generated with Claude Code
Summary by CodeRabbit
New Features
Bug Fixes
Tests