Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
289 changes: 289 additions & 0 deletions scripts/kg_cleanup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,289 @@
#!/usr/bin/env python3
"""KG data cleanup — fix known misclassifications and bad relations.

Fixes:
1. Entity type corrections (e.g., "Yichus" person → project)
2. Self-referential relation removal
3. Ad-hoc relation type normalization
4. Entity importance recomputation

Usage:
# Dry-run (default — shows what would change):
python3 scripts/kg_cleanup.py

# Apply changes:
python3 scripts/kg_cleanup.py --apply

# Stats only:
python3 scripts/kg_cleanup.py --stats
"""

import argparse
import logging
import sys
from pathlib import Path

# Add src to path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))

from brainlayer.paths import get_db_path
from brainlayer.pipeline.kg_extraction import CANONICAL_RELATION_TYPES, compute_entity_importance
from brainlayer.vector_store import VectorStore

logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
logger = logging.getLogger(__name__)

# ── Entity type corrections ──
# (entity_type, name) → new_entity_type
ENTITY_TYPE_FIXES: dict[tuple[str, str], str] = {
("person", "Yichus"): "project",
("person", "yichus"): "project",
("company", "Figma"): "technology",
("company", "Linear"): "technology",
("company", "Supabase"): "technology",
("company", "GitHub"): "technology",
("company", "GitLab"): "technology",
# Agents misclassified as person/golem
("person", "Ralph"): "agent",
("golem", "Ralph"): "agent",
("person", "ClaudeGolem"): "agent",
# golem → agent retype
("golem", "brainClaude"): "agent",
("golem", "coachClaude"): "agent",
("golem", "voiceClaude"): "agent",
("golem", "contentClaude"): "agent",
("golem", "golemsClaude"): "agent",
("golem", "recruiterGolem"): "agent",
("golem", "contentGolem"): "agent",
("golem", "tellerGolem"): "agent",
("golem", "ClaudeGolem"): "agent",
("golem", "ContentClaude"): "agent",
("golem", "orcClaude"): "agent",
("golem", "dashboardClaude"): "agent",
}

# ── Relations to delete ──
# (source_name, relation_type, target_name)
RELATIONS_TO_DELETE: list[tuple[str, str, str]] = [
# Self-referential
("brainClaude", "maintains", "brainClaude"),
# Likely misextractions
("Grammy", "framework_for", "golems"),
]


def print_stats(store: VectorStore):
"""Print current KG statistics."""
cursor = store._read_cursor()

# Entity counts by type
rows = list(
cursor.execute("SELECT entity_type, COUNT(*) FROM kg_entities GROUP BY entity_type ORDER BY COUNT(*) DESC")
)
total = sum(r[1] for r in rows)
print(f"\n{'=' * 50}")
print(f"KG Entities: {total}")
print(f"{'=' * 50}")
for etype, count in rows:
print(f" {etype:15s} {count:4d}")

# Relation counts by type
rows = list(
cursor.execute(
"SELECT relation_type, COUNT(*) FROM kg_relations WHERE expired_at IS NULL GROUP BY relation_type ORDER BY COUNT(*) DESC"
)
)
total_rels = sum(r[1] for r in rows)
print(f"\n{'=' * 50}")
print(f"KG Relations: {total_rels} ({len(rows)} types)")
print(f"{'=' * 50}")
for rtype, count in rows:
canonical = "✓" if rtype in CANONICAL_RELATION_TYPES else "✗"
print(f" {canonical} {rtype:25s} {count:4d}")

# Self-referential check
self_refs = list(
cursor.execute(
"""SELECT e1.name, r.relation_type, e2.name
FROM kg_relations r
JOIN kg_entities e1 ON r.source_id = e1.id
JOIN kg_entities e2 ON r.target_id = e2.id
WHERE r.source_id = r.target_id AND r.expired_at IS NULL"""
)
)
if self_refs:
print(f"\n⚠ Self-referential relations: {len(self_refs)}")
for src, rtype, tgt in self_refs:
print(f" {src} --{rtype}--> {tgt}")

# Importance distribution
rows = list(
cursor.execute(
"SELECT MIN(importance), MAX(importance), AVG(importance), COUNT(*) FROM kg_entities WHERE importance IS NOT NULL"
)
)
if rows and rows[0][0] is not None:
mn, mx, avg, cnt = rows[0]
print(f"\nImportance: min={mn:.2f} max={mx:.2f} avg={avg:.2f} ({cnt} entities)")

# Fact coverage
with_fact = list(
cursor.execute("SELECT COUNT(*) FROM kg_relations WHERE fact IS NOT NULL AND fact != '' AND expired_at IS NULL")
)[0][0]
print(f"Relations with facts: {with_fact}/{total_rels}")


def fix_entity_types(store: VectorStore, dry_run: bool = True) -> int:
"""Fix known entity type misclassifications."""
cursor = store._read_cursor()
fixed = 0

for (old_type, name), new_type in ENTITY_TYPE_FIXES.items():
rows = list(cursor.execute("SELECT id FROM kg_entities WHERE entity_type = ? AND name = ?", (old_type, name)))
if not rows:
continue

entity_id = rows[0][0]
if dry_run:
logger.info("[DRY-RUN] Would retype %s (%s → %s)", name, old_type, new_type)
else:
write_cursor = store.conn.cursor()
write_cursor.execute(
"UPDATE kg_entities SET entity_type = ?, updated_at = datetime('now') WHERE id = ?",
(new_type, entity_id),
)
logger.info("Retyped %s (%s → %s)", name, old_type, new_type)
fixed += 1

Comment on lines +141 to +157
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Retype all matched entities, not only the first row.

fix_entity_types fetches all matching IDs but updates only rows[0], so duplicate records remain misclassified.

🔧 Suggested fix
-        entity_id = rows[0][0]
+        entity_ids = [row[0] for row in rows]
         if dry_run:
-            logger.info("[DRY-RUN] Would retype %s (%s → %s)", name, old_type, new_type)
+            logger.info(
+                "[DRY-RUN] Would retype %s (%s → %s) [rows=%d]",
+                name,
+                old_type,
+                new_type,
+                len(entity_ids),
+            )
         else:
             write_cursor = store.conn.cursor()
-            write_cursor.execute(
-                "UPDATE kg_entities SET entity_type = ?, updated_at = datetime('now') WHERE id = ?",
-                (new_type, entity_id),
+            write_cursor.executemany(
+                "UPDATE kg_entities SET entity_type = ?, updated_at = datetime('now') WHERE id = ?",
+                [(new_type, entity_id) for entity_id in entity_ids],
             )
-            logger.info("Retyped %s (%s → %s)", name, old_type, new_type)
-        fixed += 1
+            logger.info("Retyped %s (%s → %s) [rows=%d]", name, old_type, new_type, len(entity_ids))
+        fixed += len(entity_ids)
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/kg_cleanup.py` around lines 141 - 157, The loop over
ENTITY_TYPE_FIXES in fix_entity_types only updates the first matching row
(rows[0]) so duplicates stay misclassified; modify the logic to update all
matching IDs (iterate over rows or run a single UPDATE that targets
entity_type/name) using the same write_cursor from store.conn, update updated_at
for each id (or let the bulk UPDATE set updated_at), and adjust the fixed
counter to reflect the number of rows changed; preserve dry_run behavior by
logging each would-be retype or a summary for that key.

return fixed


def fix_relations(store: VectorStore, dry_run: bool = True) -> dict[str, int]:
"""Fix bad relations: delete known bad, normalize types, remove self-refs."""
cursor = store._read_cursor()
stats = {"deleted_known": 0, "deleted_self_ref": 0, "normalized": 0}

# 1. Delete known bad relations
for src_name, rtype, tgt_name in RELATIONS_TO_DELETE:
rows = list(
cursor.execute(
"""SELECT r.id FROM kg_relations r
JOIN kg_entities e1 ON r.source_id = e1.id
JOIN kg_entities e2 ON r.target_id = e2.id
WHERE e1.name = ? AND r.relation_type = ? AND e2.name = ?
AND r.expired_at IS NULL""",
(src_name, rtype, tgt_name),
)
)
for (rel_id,) in rows:
if dry_run:
logger.info("[DRY-RUN] Would delete relation: %s --%s--> %s", src_name, rtype, tgt_name)
else:
write_cursor = store.conn.cursor()
write_cursor.execute(
"UPDATE kg_relations SET expired_at = datetime('now') WHERE id = ?",
(rel_id,),
)
logger.info("Expired relation: %s --%s--> %s", src_name, rtype, tgt_name)
stats["deleted_known"] += 1

# 2. Delete self-referential relations
self_refs = list(
cursor.execute("SELECT id, source_id FROM kg_relations WHERE source_id = target_id AND expired_at IS NULL")
)
for rel_id, _ in self_refs:
if dry_run:
logger.info("[DRY-RUN] Would expire self-referential relation %s", rel_id)
else:
write_cursor = store.conn.cursor()
write_cursor.execute(
"UPDATE kg_relations SET expired_at = datetime('now') WHERE id = ?",
(rel_id,),
)
logger.info("Expired self-referential relation %s", rel_id)
stats["deleted_self_ref"] += 1

# 3. Normalize non-canonical relation types to related_to
non_canonical = list(
cursor.execute(
"SELECT id, relation_type FROM kg_relations WHERE expired_at IS NULL AND relation_type NOT IN ({})".format(
",".join(f"'{t}'" for t in CANONICAL_RELATION_TYPES)
)
)
)
for rel_id, old_type in non_canonical:
if dry_run:
logger.info("[DRY-RUN] Would normalize relation type %s → related_to", old_type)
else:
write_cursor = store.conn.cursor()
write_cursor.execute(
"UPDATE kg_relations SET relation_type = 'related_to' WHERE id = ?",
(rel_id,),
)
logger.info("Normalized relation type %s → related_to", old_type)
stats["normalized"] += 1

Comment on lines +190 to +225
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Batch relation cleanup updates instead of full-materialization + per-row writes.

self_refs and non_canonical are fully loaded into memory and then updated one row at a time. On large KG datasets this can hold long write locks and degrade throughput.

⚙️ Suggested batching pattern
+def _update_relations_in_batches(store: VectorStore, rel_ids: list[str], *, batch_size: int = 5000) -> int:
+    updated = 0
+    cursor = store.conn.cursor()
+    for i in range(0, len(rel_ids), batch_size):
+        batch = rel_ids[i : i + batch_size]
+        cursor.executemany(
+            "UPDATE kg_relations SET expired_at = datetime('now') WHERE id = ?",
+            [(rid,) for rid in batch],
+        )
+        updated += len(batch)
+        if ((i // batch_size) + 1) % 3 == 0:
+            cursor.execute("PRAGMA wal_checkpoint(FULL)")
+    return updated

As per coding guidelines, "Batch delete operations in 5-10K chunks with checkpoint every 3 batches".

return stats


def main():
parser = argparse.ArgumentParser(description="KG data cleanup")
parser.add_argument("--apply", action="store_true", help="Apply changes (default: dry-run)")
parser.add_argument("--stats", action="store_true", help="Print stats only")
parser.add_argument("--importance", action="store_true", help="Recompute entity importance")
args = parser.parse_args()

db_path = get_db_path()
logger.info("Using DB: %s", db_path)
store = VectorStore(db_path)

if args.stats:
print_stats(store)
store.close()
return

dry_run = not args.apply

if dry_run:
logger.info("=== DRY-RUN MODE (use --apply to commit changes) ===")
else:
store.conn.cursor().execute("PRAGMA wal_checkpoint(FULL)")
logger.info("WAL checkpoint (pre) done")

# Fix entity types
logger.info("\n--- Entity Type Fixes ---")
entity_fixes = fix_entity_types(store, dry_run=dry_run)
logger.info("Entity type fixes: %d", entity_fixes)

# Fix relations
logger.info("\n--- Relation Fixes ---")
rel_stats = fix_relations(store, dry_run=dry_run)
logger.info(
"Relation fixes: %d deleted (known), %d deleted (self-ref), %d normalized",
rel_stats["deleted_known"],
rel_stats["deleted_self_ref"],
rel_stats["normalized"],
)

# Recompute importance
if args.importance or args.apply:
logger.info("\n--- Recomputing Entity Importance ---")
if dry_run:
logger.info("[DRY-RUN] Would recompute entity importance")
else:
updated = compute_entity_importance(store)
logger.info("Updated importance for %d entities", updated)

if not dry_run:
store.conn.cursor().execute("PRAGMA wal_checkpoint(FULL)")
logger.info("WAL checkpoint (post) done")

Comment thread
coderabbitai[bot] marked this conversation as resolved.
print_stats(store)
store.close()

if dry_run:
logger.info("\nTo apply changes: python3 scripts/kg_cleanup.py --apply")


if __name__ == "__main__":
main()
21 changes: 19 additions & 2 deletions src/brainlayer/pipeline/batch_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,11 @@
"Avi Simon",
"Yuval Nir",
"Daniel Munk",
"Andrew Huberman",
"Joshua Anderson",
"Theo Browne",
],
"company": ["Cantaloupe AI", "Domica", "MeHayom", "ProductDZ", "Weby"],
"company": ["Cantaloupe AI", "Domica", "MeHayom", "ProductDZ", "Weby", "Union"],
"project": [
"brainlayer",
"voicelayer",
Expand All @@ -37,13 +40,27 @@
"domica",
"rudy-monorepo",
"union",
"orchestrator",
"etanheyman.com",
"golem-profiles",
"6pm",
"6pm-mini",
"soltome",
"yichus",
],
"golem": [
"agent": [
"golemsClaude",
"brainClaude",
"voiceClaude",
"coachClaude",
"contentClaude",
"Ralph",
"ClaudeGolem",
"recruiterGolem",
"contentGolem",
"tellerGolem",
"orcClaude",
Comment thread
coderabbitai[bot] marked this conversation as resolved.
"dashboardClaude",
],
}

Expand Down
24 changes: 18 additions & 6 deletions src/brainlayer/pipeline/entity_extraction.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,20 @@ def _deduplicate_overlaps(entities: list[ExtractedEntity]) -> list[ExtractedEnti

# ── LLM-based extraction ──

_NER_PROMPT_TEMPLATE = """Extract named entities and relationships from this text.
_NER_PROMPT_TEMPLATE = """Extract named entities and relationships from this developer conversation text.

Entity types: person, company, project, golem, tool, topic
Relation types: works_at, owns, builds, uses, client_of, mentioned_in
Entity types: person, agent, company, project, tool, technology, topic
- person: Human names (First Last). NOT repos/tools/agents.
- agent: AI agents (*Claude, *Golem, Ralph). NOT humans.
- company: Businesses. project: Code repos/apps. tool/technology: Dev tools, languages, frameworks.

Return JSON only, no explanation:
{{"entities": [{{"text": "exact text from input", "type": "entity_type"}}], "relations": [{{"source": "entity text", "target": "entity text", "type": "relation_type"}}]}}
Relation types (direction: source → target):
- works_at: person → company. owns: person → project/company. builds: person/agent → project.
- uses: entity → tool/technology. client_of: A → B (B serves A). affiliated_with: person → company.
- coaches: agent → person. related_to: generic fallback.

Return JSON only:
{{"entities": [{{"text": "exact text from input", "type": "entity_type"}}], "relations": [{{"source": "entity text", "target": "entity text", "type": "relation_type", "fact": "natural language sentence"}}]}}

Comment thread
coderabbitai[bot] marked this conversation as resolved.
If no entities found, return: {{"entities": [], "relations": []}}

Expand Down Expand Up @@ -188,13 +195,18 @@ def parse_llm_ner_response(response: str, source_text: str) -> tuple[list[Extrac
if not source or not target or not rtype:
continue

fact = raw_rel.get("fact")
props = raw_rel.get("properties") or {}
if fact and "fact" not in props:
props["fact"] = fact
Comment on lines +198 to +201
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Guard properties shape before injecting fact.

If properties is not a dict, props["fact"] = fact can raise and break extraction for the whole response.

🛡️ Suggested fix
-        fact = raw_rel.get("fact")
-        props = raw_rel.get("properties") or {}
+        fact = raw_rel.get("fact")
+        raw_props = raw_rel.get("properties")
+        props = raw_props if isinstance(raw_props, dict) else {}
         if fact and "fact" not in props:
             props["fact"] = fact
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
fact = raw_rel.get("fact")
props = raw_rel.get("properties") or {}
if fact and "fact" not in props:
props["fact"] = fact
fact = raw_rel.get("fact")
raw_props = raw_rel.get("properties")
props = raw_props if isinstance(raw_props, dict) else {}
if fact and "fact" not in props:
props["fact"] = fact
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/brainlayer/pipeline/entity_extraction.py` around lines 198 - 201, The
code assumes props = raw_rel.get("properties") is a dict before doing
props["fact"] = fact; first verify the shape with isinstance(props, dict) and
only mutate it when true, otherwise create a new dict (e.g., new_props =
{"fact": fact}) and assign it back to raw_rel["properties"]; ensure you update
raw_rel (not just the local props variable) so downstream code sees the
corrected properties, using the local symbols fact, props and
raw_rel.get("properties") to locate the change.


relations.append(
ExtractedRelation(
source_text=source,
target_text=target,
relation_type=rtype,
confidence=0.7,
properties=raw_rel.get("properties", {}),
properties=props,
)
)

Expand Down
Loading