Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 42 additions & 0 deletions scripts/promote_tag_entities.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/usr/bin/env python3
"""Promote high-frequency concept tags into KG entities."""

from __future__ import annotations

import argparse
import json
import sys
from pathlib import Path

sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "src"))

from brainlayer.paths import get_db_path
from brainlayer.pipeline.tag_entity_promotion import promote_tag_entities
from brainlayer.vector_store import VectorStore


def main() -> int:
parser = argparse.ArgumentParser(description="Promote high-frequency chunk tags into KG entities")
parser.add_argument("--min-count", type=int, default=500, help="Minimum tagged chunk count to promote")
parser.add_argument("--limit", type=int, default=None, help="Optional candidate limit")
parser.add_argument("--dry-run", action="store_true", help="Show candidates without writing")
args = parser.parse_args()

store = None
try:
store = VectorStore(get_db_path())
stats = promote_tag_entities(
Comment on lines +19 to +28
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Add explicit DB-target and write confirmation for non-dry runs.

Line [27] currently opens the canonical DB implicitly. A mistaken shell/env can write to the wrong database with no guard.

Proposed hardening
 def main() -> int:
     parser = argparse.ArgumentParser(description="Promote high-frequency chunk tags into KG entities")
+    parser.add_argument("--db-path", type=Path, default=None, help="Override DB path (defaults to get_db_path())")
     parser.add_argument("--min-count", type=int, default=500, help="Minimum tagged chunk count to promote")
     parser.add_argument("--limit", type=int, default=None, help="Optional candidate limit")
     parser.add_argument("--dry-run", action="store_true", help="Show candidates without writing")
+    parser.add_argument("--yes", action="store_true", help="Confirm writes when not using --dry-run")
     args = parser.parse_args()

+    if not args.dry_run and not args.yes:
+        parser.error("Refusing to write without --yes (or use --dry-run).")
+
+    db_path = args.db_path or get_db_path()
     store = None
     try:
-        store = VectorStore(get_db_path())
+        store = VectorStore(db_path)
         stats = promote_tag_entities(
             store,
             min_count=args.min_count,
             limit=args.limit,
             dry_run=args.dry_run,
         )

Also applies to: 34-35

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@scripts/promote_tag_entities.py` around lines 19 - 28, The script opens the
canonical DB implicitly which can accidentally write to the wrong database; add
an explicit CLI option (e.g., --db-path) to argparse and use that value when
constructing VectorStore(get_db_path()) (or replace get_db_path() with the
provided arg), and require explicit confirmation for non-dry runs (either a
--confirm flag or an interactive prompt) before calling promote_tag_entities and
performing writes; ensure the code path that performs writes checks args.dry_run
and the confirmation flag/response before proceeding so accidental writes are
prevented.

store,
min_count=args.min_count,
limit=args.limit,
dry_run=args.dry_run,
)
print(json.dumps(stats, indent=2, sort_keys=True))
return 0
finally:
if store is not None:
store.close()


if __name__ == "__main__":
raise SystemExit(main())
225 changes: 225 additions & 0 deletions src/brainlayer/pipeline/tag_entity_promotion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
"""Promote high-frequency concept tags into KG entities."""

from __future__ import annotations

import re
from typing import Any

from ..vector_store import VectorStore

ACTIVITY_TAG_PREFIXES = ("act:", "dom:", "meta/")
ACTIVITY_TAGS = {
"debugging",
"testing",
"refactoring",
"code-review",
"bug-fix",
"feature-dev",
"configuration",
"documentation",
"project-management",
"error-handling",
"task-management",
"deployment",
"workflow",
"automation",
"verification",
"command-line",
"planning",
"tooling",
"file-system",
"file-management",
"scripting",
"monitoring",
"assistant-action",
"status-update",
"version-control",
"implementation",
"collaboration",
"discussion",
"code-analysis",
"metadata",
"architecture",
"styling",
"confirmation",
"troubleshooting",
"design",
"frontend",
"backend",
"command",
"shell",
"bash",
"grep",
"json",
"regex",
"html",
"css",
"svg",
}

PERSON_TAGS = {
"andrew-huberman",
"avi-simon",
"daniel-munk",
"dor-zohar",
"etan-heyman",
"joshua-anderson",
"maor-noah",
"shachar-gerby",
"theo-browne",
"yuval-nir",
}

TECHNOLOGY_TAGS = {
"1password",
"convex",
"docker",
"javascript",
"nextjs",
"openai",
"postgres",
"python",
"railway",
"react",
"sqlite",
"supabase",
"telegram",
"typescript",
"whatsapp",
}

TOPIC_TAGS = {
"cold-exposure",
"dopamine",
"exercise",
"fitness",
"metabolism",
"neuroscience",
"nutrition",
"psychology",
"supplements",
"wellness",
}

HEBREW_MARKERS = {"hebrew", "rtl", "right-to-left"}
COMMUNITY_MARKERS = {"community", "collective", "crew", "forum", "group", "guild", "network"}


def _slugify_tag(tag: str) -> str:
normalized = re.sub(r"[^a-z0-9]+", "-", tag.lower()).strip("-")
return re.sub(r"-{2,}", "-", normalized)


def classify_tag_entity_type(tag: str) -> str:
"""Infer an entity type from a promoted tag."""
normalized = _slugify_tag(tag)

if normalized in PERSON_TAGS:
return "person"
if normalized in TECHNOLOGY_TAGS:
return "technology"
if any(marker in normalized for marker in COMMUNITY_MARKERS):
return "community"
if normalized in TOPIC_TAGS or any(marker in normalized for marker in HEBREW_MARKERS):
return "topic"
return "topic"


def find_promotion_candidates(store: VectorStore, min_count: int = 500, limit: int | None = None) -> list[dict[str, Any]]:
"""Find high-frequency concept tags worth promoting to entities."""
cursor = store._read_cursor()
placeholders = ", ".join("?" for _ in ACTIVITY_TAGS)
query = f"""
SELECT ct.tag, COUNT(*) as cnt
FROM chunk_tags ct
LEFT JOIN kg_entities e ON lower(e.name) = lower(ct.tag)
WHERE e.id IS NULL
AND ct.tag IS NOT NULL
AND ct.tag != ''
AND ct.tag NOT LIKE 'act:%'
AND ct.tag NOT LIKE 'dom:%'
AND ct.tag NOT LIKE 'meta/%'
AND lower(ct.tag) NOT IN ({placeholders})
GROUP BY ct.tag
HAVING COUNT(*) >= ?
ORDER BY cnt DESC, ct.tag ASC
"""
Comment on lines +132 to +146
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Normalize tag keys in SQL to prevent split counts and missed links.

Line [133]-Line [146] aggregates by raw ct.tag, and Line [209]/Line [219] links by exact tag equality. Mixed-case/whitespace variants can be undercounted or unlinked.

Proposed normalization fix
-    query = f"""
-        SELECT ct.tag, COUNT(*) as cnt
+    query = f"""
+        SELECT lower(trim(ct.tag)) AS normalized_tag, COUNT(*) as cnt
         FROM chunk_tags ct
-        LEFT JOIN kg_entities e ON lower(e.name) = lower(ct.tag)
+        LEFT JOIN kg_entities e ON lower(e.name) = lower(trim(ct.tag))
         WHERE e.id IS NULL
           AND ct.tag IS NOT NULL
           AND ct.tag != ''
           AND ct.tag NOT LIKE 'act:%'
           AND ct.tag NOT LIKE 'dom:%'
           AND ct.tag NOT LIKE 'meta/%'
-          AND lower(ct.tag) NOT IN ({placeholders})
-        GROUP BY ct.tag
+          AND lower(trim(ct.tag)) NOT IN ({placeholders})
+        GROUP BY normalized_tag
         HAVING COUNT(*) >= ?
-        ORDER BY cnt DESC, ct.tag ASC
+        ORDER BY cnt DESC, normalized_tag ASC
     """
@@
-            "tag": row[0],
+            "tag": row[0],
             "count": row[1],
             "entity_type": classify_tag_entity_type(row[0]),
@@
-                WHERE ct.tag = ?
+                WHERE lower(trim(ct.tag)) = ?
                 """,
-                (entity_id, tag),
+                (entity_id, tag.lower().strip()),
             )
@@
-                WHERE ct.tag = ?
+                WHERE lower(trim(ct.tag)) = ?
                 """,
-                (entity_id, tag),
+                (entity_id, tag.lower().strip()),
             )

Also applies to: 153-161, 203-222

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/brainlayer/pipeline/tag_entity_promotion.py` around lines 132 - 146, The
SQL uses raw ct.tag which leads to split counts and missed joins for
casing/whitespace variants; update all queries (the SELECT/GROUP BY/ORDER BY
block that defines query and any later link queries) to normalize keys using
lower(trim(ct.tag)) (alias it e.g. norm_tag) everywhere you reference ct.tag,
use the same normalized expression in the JOIN condition against lower(e.name),
in NOT IN placeholders convert the excluded tags to lower(trim(...)), and GROUP
BY the normalized expression so counts and subsequent linking (the code that
inserts/promotes using ct.tag) operate on the normalized tag values
consistently.

params: list[Any] = [tag.lower() for tag in sorted(ACTIVITY_TAGS)]
params.append(min_count)
if limit is not None:
query += " LIMIT ?"
params.append(limit)

rows = list(cursor.execute(query, params))
return [
{
"tag": row[0],
"count": row[1],
"entity_type": classify_tag_entity_type(row[0]),
}
for row in rows
]


def promote_tag_entities(
store: VectorStore,
min_count: int = 500,
limit: int | None = None,
dry_run: bool = False,
) -> dict[str, Any]:
"""Promote high-frequency tags into KG entities and link matching chunks."""
candidates = find_promotion_candidates(store, min_count=min_count, limit=limit)
stats = {
"candidates": len(candidates),
"entities_created": 0,
"links_created": 0,
"promoted_tags": [candidate["tag"] for candidate in candidates],
}
if dry_run:
return stats

cursor = store.conn.cursor()
kg_entity_chunk_cols = {row[1] for row in cursor.execute("PRAGMA table_info(kg_entity_chunks)")}
has_mention_type = "mention_type" in kg_entity_chunk_cols

for candidate in candidates:
tag = candidate["tag"]
entity_type = candidate["entity_type"]
entity_id = f"auto-tag-{_slugify_tag(tag)}"
existing = store.get_entity_by_name(entity_type, tag)
if existing is None:
store.upsert_entity(
entity_id,
entity_type,
tag,
metadata={"source": "tag-promotion", "tag_count": candidate["count"]},
confidence=0.8,
importance=0.6,
)
stats["entities_created"] += 1
else:
entity_id = existing["id"]

if has_mention_type:
cursor.execute(
"""
INSERT OR IGNORE INTO kg_entity_chunks (entity_id, chunk_id, relevance, context, mention_type)
SELECT ?, ct.chunk_id, 0.8, 'tag-promotion', 'tag'
FROM chunk_tags ct
WHERE ct.tag = ?
""",
(entity_id, tag),
)
else:
cursor.execute(
"""
INSERT OR IGNORE INTO kg_entity_chunks (entity_id, chunk_id, relevance, context)
SELECT ?, ct.chunk_id, 0.8, 'tag-promotion'
FROM chunk_tags ct
WHERE ct.tag = ?
""",
(entity_id, tag),
)
stats["links_created"] += store.conn.changes()
Comment on lines +181 to +223
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Protect promotion writes with exclusive-write orchestration and BUSY retries.

This function performs bulk writes (kg_entities, kg_entity_chunks) without explicit one-writer coordination. Under concurrent enrichment/MCP writes, this can fail partially or contend heavily.

As per coding guidelines: "Enforce one-write-at-a-time concurrency constraint; reads are safe but brain_digest is write-heavy and must not run in parallel with other MCP work" and "Never run bulk database operations while enrichment workers are writing; always stop workers and checkpoint WAL first."

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@src/brainlayer/pipeline/tag_entity_promotion.py` around lines 181 - 223, The
loop that upserts entities and inserts into kg_entity_chunks (uses cursor,
store.conn, upsert_entity, kg_entity_chunks, chunk_tags, stats) must be
protected with exclusive-write orchestration and SQLITE_BUSY retries: wrap the
entire promotion sequence for all candidates in a single exclusive write
transaction (e.g., acquire a DB-level exclusive lock via BEGIN EXCLUSIVE or an
application-level one-writer mutex) so only one promoter runs at a time, perform
a WAL checkpoint before taking the exclusive lock, and implement retry/backoff
on SQLITE_BUSY (or set busy_timeout) for transient contention; ensure you commit
or rollback the exclusive transaction and release the lock in a finally block so
partial writes don’t occur and stats remain consistent.


return stats
6 changes: 6 additions & 0 deletions src/brainlayer/vector_store.py
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,12 @@ def _init_db(self) -> None:
("tool", "entity", "Software tool or service"),
("project", "entity", "Software project or initiative"),
("concept", "entity", "Abstract concept, pattern, or domain idea"),
("topic", "concept", "Recurring subject or thematic area"),
("protocol", "topic", "Named workflow or protocol"),
("community", "entity", "Community, audience, or social group"),
("health_metric", "topic", "Health or wellness metric"),
("workflow", "concept", "Repeatable workflow or process"),
("device", "entity", "Hardware device or machine"),
("event", "entity", "Temporal event or occurrence"),
("organization", "entity", "Company or group"),
("golem", "agent", "Specialized AI agent in the golems ecosystem"),
Expand Down
108 changes: 108 additions & 0 deletions tests/test_tag_entity_promotion.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
"""Tests for tag-to-entity promotion pipeline."""

import json

import pytest

from brainlayer.vector_store import VectorStore


@pytest.fixture
def store(tmp_path):
db_path = tmp_path / "test.db"
s = VectorStore(db_path)
yield s
s.close()


def _insert_chunk_with_tags(store, chunk_id, tags):
cursor = store.conn.cursor()
cursor.execute(
"""INSERT INTO chunks (
id, content, metadata, source_file, project, content_type,
char_count, source, tags, created_at
) VALUES (?, ?, '{}', 'test.jsonl', 'brainlayer', 'assistant_text', ?, 'tests', ?, datetime('now'))""",
(
chunk_id,
f"content for {chunk_id}",
len(chunk_id),
json.dumps(tags),
),
)


class TestTagPromotionHeuristics:
def test_classify_tag_entity_type_uses_spec_heuristics(self):
from brainlayer.pipeline.tag_entity_promotion import classify_tag_entity_type

assert classify_tag_entity_type("telegram") == "technology"
assert classify_tag_entity_type("andrew-huberman") == "person"
assert classify_tag_entity_type("neuroscience") == "topic"
assert classify_tag_entity_type("hebrew-writing") == "topic"
assert classify_tag_entity_type("founders-community") == "community"
assert classify_tag_entity_type("morning-routine") == "topic"


class TestTagPromotionCandidates:
def test_find_candidates_skips_existing_and_activity_tags(self, store):
from brainlayer.pipeline.tag_entity_promotion import find_promotion_candidates

_insert_chunk_with_tags(store, "chunk-1", ["telegram", "debugging", "existing-topic"])
_insert_chunk_with_tags(store, "chunk-2", ["telegram", "debugging", "existing-topic"])
store.upsert_entity("existing-topic", "topic", "existing-topic")

candidates = find_promotion_candidates(store, min_count=2)

assert [candidate["tag"] for candidate in candidates] == ["telegram"]

Comment on lines +47 to +57
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick | 🔵 Trivial

Add a mixed-case tag regression test.

Current scenarios only use lowercase tags. Please add coverage for variants like ["Telegram", "telegram"] to ensure candidate counting and chunk linking are case-normalized end-to-end.

Also applies to: 60-91

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/test_tag_entity_promotion.py` around lines 47 - 57, Update the
test_find_candidates_skips_existing_and_activity_tags test (and similar tests
between lines 60-91) to include mixed-case tag variants (e.g., insert chunks
with ["Telegram", "telegram", "existing-topic"]) using the existing helper
_insert_chunk_with_tags so tags are created with differing cases, then call
find_promotion_candidates(store, min_count=2) and assert that the result
normalizes case by returning a single candidate "telegram" (lowercased) and that
existing entities inserted via store.upsert_entity("existing-topic", "topic",
"existing-topic") are still skipped; this ensures find_promotion_candidates and
the chunk/link logic are case-normalized end-to-end.


class TestTagPromotionExecution:
def test_promote_tag_candidates_creates_entities_and_links_chunks(self, store):
from brainlayer.pipeline.tag_entity_promotion import promote_tag_entities

_insert_chunk_with_tags(store, "chunk-1", ["telegram", "feature-dev"])
_insert_chunk_with_tags(store, "chunk-2", ["telegram"])
_insert_chunk_with_tags(store, "chunk-3", ["neuroscience"])
_insert_chunk_with_tags(store, "chunk-4", ["neuroscience"])

stats = promote_tag_entities(store, min_count=2)

assert stats["candidates"] == 2
assert stats["entities_created"] == 2
assert stats["links_created"] == 4

cursor = store._read_cursor()
entities = {
row[0]: row[1]
for row in cursor.execute(
"SELECT name, entity_type FROM kg_entities WHERE id LIKE 'auto-tag-%'"
)
}
assert entities["telegram"] == "technology"
assert entities["neuroscience"] == "topic"

links = list(
cursor.execute(
"SELECT entity_id, chunk_id, mention_type FROM kg_entity_chunks WHERE entity_id LIKE 'auto-tag-%'"
)
)
assert len(links) == 4
assert {row[2] for row in links} == {"tag"}

def test_vector_store_seeds_new_entity_types(self, store):
cursor = store._read_cursor()
rows = list(
cursor.execute(
"SELECT child_type, parent_type FROM entity_type_hierarchy WHERE child_type IN (?, ?, ?, ?, ?, ?)",
("topic", "protocol", "community", "health_metric", "workflow", "device"),
)
)

assert dict(rows) == {
"topic": "concept",
"protocol": "topic",
"community": "entity",
"health_metric": "topic",
"workflow": "concept",
"device": "entity",
}
Loading