From db9fb56e1df829cafca2145d25c9a1b7ad899c5f Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Fri, 27 Feb 2026 11:26:54 +0200 Subject: [PATCH] =?UTF-8?q?feat:=20KG=20standard=20tables=20=E2=80=94=20ma?= =?UTF-8?q?tches=20Convex=20kgSpec.ts?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add standardized knowledge graph schema to BrainLayer's SQLite: - kg_entities: canonical_name, description, confidence, importance, valid_from/until, group_id - kg_relations: fact, importance, valid_from/until, expired_at, source_chunk_id - kg_entity_chunks: mention_type (explicit/inferred/embedding_match) - kg_current_facts VIEW (auto-filters expired relations) - New methods: soft_close_relation, get_current_facts, traverse (2-hop CTE), resolve_entity - Shared constants: ENTITY_TYPES, RELATION_TYPES, DECAY_CONSTANTS, effective_score() - 41 new tests in test_kg_standard.py, all backward compatible Co-Authored-By: Claude Opus 4.6 --- src/brainlayer/kg/__init__.py | 57 ++++ src/brainlayer/vector_store.py | 419 +++++++++++++++++++++++++++-- tests/test_kg_schema.py | 23 +- tests/test_kg_standard.py | 473 +++++++++++++++++++++++++++++++++ 4 files changed, 937 insertions(+), 35 deletions(-) create mode 100644 src/brainlayer/kg/__init__.py create mode 100644 tests/test_kg_standard.py diff --git a/src/brainlayer/kg/__init__.py b/src/brainlayer/kg/__init__.py new file mode 100644 index 0000000..b2b62a7 --- /dev/null +++ b/src/brainlayer/kg/__init__.py @@ -0,0 +1,57 @@ +"""BrainLayer Knowledge Graph — standardized KG spec (matches Convex kgSpec.ts).""" + +import math +from typing import Optional + +# ── Shared Constants (must match kgSpec.ts in 6PM) ────────────────────────── + +ENTITY_TYPES = [ + "person", + "constraint", + "preference", + "life_event", + "meeting", + "location", + "organization", +] + +RELATION_TYPES = [ + "has_constraint", + "has_preference", + "blocked_during", + "attended", + "organized_by", + "knows", + "works_at", + "supersedes", + "held_at", +] + +# Half-life decay constants (lambda) for time-based relevance scoring +DECAY_CONSTANTS: dict[str, float] = { + "constraint": 0.0019, # ~365 day half-life + "preference": 0.0077, # ~90 day half-life + "life_event": 0, # date-bounded, no decay + "casual": 0.0231, # ~30 day half-life + "meeting": 0.0046, # ~150 day half-life +} + + +def effective_score( + confidence: float, + importance: float, + age_days: float, + entity_type: Optional[str] = None, +) -> float: + """Calculate time-decayed effective score for a KG entity or relation. + + Score = confidence * importance * exp(-lambda * age_days) + + Args: + confidence: How certain we are about this fact (0-1) + importance: How important this fact is (0-1) + age_days: Age in days since creation + entity_type: Entity type for decay rate lookup (defaults to 'preference' rate) + """ + lam = DECAY_CONSTANTS.get(entity_type or "", 0.0077) + return confidence * importance * math.exp(-lam * age_days) diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index a42a81b..aaf78ab 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -504,6 +504,59 @@ def _init_db(self) -> None: """) cursor.execute("CREATE INDEX IF NOT EXISTS idx_kg_alias_entity ON kg_entity_aliases(entity_id)") + # ── KG Standard Spec Migrations (matches Convex kgSpec.ts) ────────── + + # Add new standard columns to kg_entities + for col, default in [ + ("canonical_name", "TEXT"), + ("description", "TEXT"), + ("confidence", "REAL DEFAULT 1.0"), + ("importance", "REAL DEFAULT 0.5"), + ("valid_from", "TEXT"), + ("valid_until", "TEXT"), + ("group_id", "TEXT"), + ]: + if col not in kg_entity_cols: + cursor.execute(f"ALTER TABLE kg_entities ADD COLUMN {col} {default}") + + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_kg_entities_canonical ON kg_entities(canonical_name, entity_type)" + ) + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_kg_entities_valid ON kg_entities(valid_from, valid_until)" + ) + + # Add new standard columns to kg_relations + for col, default in [ + ("fact", "TEXT"), + ("importance", "REAL DEFAULT 0.5"), + ("valid_from", "TEXT"), + ("valid_until", "TEXT"), + ("expired_at", "TEXT"), + ("source_chunk_id", "TEXT"), + ]: + if col not in kg_rel_cols: + cursor.execute(f"ALTER TABLE kg_relations ADD COLUMN {col} {default}") + + cursor.execute( + "CREATE INDEX IF NOT EXISTS idx_kg_relations_validity ON kg_relations(valid_from, valid_until)" + ) + + # Add mention_type to kg_entity_chunks + ec_cols = {row[1] for row in cursor.execute("PRAGMA table_info(kg_entity_chunks)")} + if "mention_type" not in ec_cols: + cursor.execute("ALTER TABLE kg_entity_chunks ADD COLUMN mention_type TEXT") + + # kg_current_facts view — auto-filters expired relations + cursor.execute("DROP VIEW IF EXISTS kg_current_facts") + cursor.execute(""" + CREATE VIEW kg_current_facts AS + SELECT * FROM kg_relations + WHERE (valid_from IS NULL OR valid_from <= strftime('%Y-%m-%dT%H:%M:%fZ','now')) + AND (valid_until IS NULL OR valid_until >= strftime('%Y-%m-%dT%H:%M:%fZ','now')) + AND expired_at IS NULL + """) + # ── End Knowledge Graph tables ────────────────────────────────────── # Check if FTS5 needs backfill (existing DB without FTS5 data) @@ -2270,21 +2323,53 @@ def upsert_entity( name: str, metadata: Optional[Dict] = None, embedding: Optional[List[float]] = None, + *, + canonical_name: Optional[str] = None, + description: Optional[str] = None, + confidence: Optional[float] = None, + importance: Optional[float] = None, + valid_from: Optional[str] = None, + valid_until: Optional[str] = None, + group_id: Optional[str] = None, ) -> str: - """Insert or update a KG entity. Returns the entity ID.""" + """Insert or update a KG entity. Returns the entity ID. + + Standard fields (matching Convex kgSpec.ts): + canonical_name: Lowercased canonical form (auto-derived from name if not set) + description: Human-readable description + confidence: How certain we are (0-1, default 1.0) + importance: How important (0-1, default 0.5) + valid_from/valid_until: ISO 8601 temporal validity + group_id: Multi-tenant namespace + """ cursor = self.conn.cursor() meta_json = json.dumps(metadata or {}) now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + canon = canonical_name or name.lower().replace(" ", "_") + conf = confidence if confidence is not None else 1.0 + imp = importance if importance is not None else 0.5 cursor.execute( """ - INSERT INTO kg_entities (id, entity_type, name, metadata, created_at, updated_at) - VALUES (?, ?, ?, ?, ?, ?) + INSERT INTO kg_entities (id, entity_type, name, metadata, canonical_name, + description, confidence, importance, + valid_from, valid_until, group_id, + created_at, updated_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(entity_type, name) DO UPDATE SET metadata = excluded.metadata, + canonical_name = excluded.canonical_name, + description = excluded.description, + confidence = excluded.confidence, + importance = excluded.importance, + valid_from = COALESCE(excluded.valid_from, kg_entities.valid_from), + valid_until = COALESCE(excluded.valid_until, kg_entities.valid_until), + group_id = COALESCE(excluded.group_id, kg_entities.group_id), updated_at = excluded.updated_at """, - (entity_id, entity_type, name, meta_json, now, now), + (entity_id, entity_type, name, meta_json, canon, + description, conf, imp, + valid_from, valid_until, group_id, now, now), ) # Retrieve the actual stored ID (may differ from entity_id on conflict) @@ -2312,20 +2397,40 @@ def add_relation( relation_type: str, properties: Optional[Dict] = None, confidence: float = 1.0, + *, + fact: Optional[str] = None, + importance: float = 0.5, + valid_from: Optional[str] = None, + valid_until: Optional[str] = None, + source_chunk_id: Optional[str] = None, ) -> str: - """Add a relationship between two entities. Returns the relation ID.""" + """Add a relationship between two entities. Returns the relation ID. + + Standard fields (matching Convex kgSpec.ts): + fact: Natural language description (e.g., "Alice does not meet on Fridays") + importance: How important (0-1, default 0.5) + valid_from/valid_until: ISO 8601 temporal validity + source_chunk_id: Provenance — which conversation chunk this came from + """ cursor = self.conn.cursor() props_json = json.dumps(properties or {}) cursor.execute( """ - INSERT INTO kg_relations (id, source_id, target_id, relation_type, properties, confidence) - VALUES (?, ?, ?, ?, ?, ?) + INSERT INTO kg_relations (id, source_id, target_id, relation_type, properties, confidence, + fact, importance, valid_from, valid_until, source_chunk_id) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ON CONFLICT(source_id, target_id, relation_type) DO UPDATE SET properties = excluded.properties, - confidence = excluded.confidence + confidence = excluded.confidence, + fact = COALESCE(excluded.fact, kg_relations.fact), + importance = COALESCE(excluded.importance, kg_relations.importance), + valid_from = COALESCE(excluded.valid_from, kg_relations.valid_from), + valid_until = COALESCE(excluded.valid_until, kg_relations.valid_until), + source_chunk_id = COALESCE(excluded.source_chunk_id, kg_relations.source_chunk_id) """, - (relation_id, source_id, target_id, relation_type, props_json, confidence), + (relation_id, source_id, target_id, relation_type, props_json, confidence, + fact, importance, valid_from, valid_until, source_chunk_id), ) # Retrieve the actual stored ID (may differ from relation_id on conflict) @@ -2343,18 +2448,25 @@ def link_entity_chunk( chunk_id: str, relevance: float = 1.0, context: Optional[str] = None, + *, + mention_type: Optional[str] = None, ) -> None: - """Link an entity to an existing chunk.""" + """Link an entity to an existing chunk. + + Args: + mention_type: How entity was found — 'explicit', 'inferred', 'embedding_match' + """ cursor = self.conn.cursor() cursor.execute( """ - INSERT INTO kg_entity_chunks (entity_id, chunk_id, relevance, context) - VALUES (?, ?, ?, ?) + INSERT INTO kg_entity_chunks (entity_id, chunk_id, relevance, context, mention_type) + VALUES (?, ?, ?, ?, ?) ON CONFLICT(entity_id, chunk_id) DO UPDATE SET relevance = excluded.relevance, - context = excluded.context + context = excluded.context, + mention_type = COALESCE(excluded.mention_type, kg_entity_chunks.mention_type) """, - (entity_id, chunk_id, relevance, context), + (entity_id, chunk_id, relevance, context, mention_type), ) def get_entity(self, entity_id: str) -> Optional[Dict[str, Any]]: @@ -2362,7 +2474,10 @@ def get_entity(self, entity_id: str) -> Optional[Dict[str, Any]]: cursor = self._read_cursor() rows = list( cursor.execute( - "SELECT id, entity_type, name, metadata, created_at, updated_at FROM kg_entities WHERE id = ?", + """SELECT id, entity_type, name, metadata, created_at, updated_at, + canonical_name, description, confidence, importance, + valid_from, valid_until, group_id + FROM kg_entities WHERE id = ?""", (entity_id,), ) ) @@ -2376,6 +2491,13 @@ def get_entity(self, entity_id: str) -> Optional[Dict[str, Any]]: "metadata": json.loads(row[3]) if row[3] else {}, "created_at": row[4], "updated_at": row[5], + "canonical_name": row[6], + "description": row[7], + "confidence": row[8], + "importance": row[9], + "valid_from": row[10], + "valid_until": row[11], + "group_id": row[12], } def get_entity_by_name(self, entity_type: str, name: str) -> Optional[Dict[str, Any]]: @@ -2383,7 +2505,10 @@ def get_entity_by_name(self, entity_type: str, name: str) -> Optional[Dict[str, cursor = self._read_cursor() rows = list( cursor.execute( - "SELECT id, entity_type, name, metadata, created_at, updated_at FROM kg_entities WHERE entity_type = ? AND name = ?", + """SELECT id, entity_type, name, metadata, created_at, updated_at, + canonical_name, description, confidence, importance, + valid_from, valid_until, group_id + FROM kg_entities WHERE entity_type = ? AND name = ?""", (entity_type, name), ) ) @@ -2397,6 +2522,13 @@ def get_entity_by_name(self, entity_type: str, name: str) -> Optional[Dict[str, "metadata": json.loads(row[3]) if row[3] else {}, "created_at": row[4], "updated_at": row[5], + "canonical_name": row[6], + "description": row[7], + "confidence": row[8], + "importance": row[9], + "valid_from": row[10], + "valid_until": row[11], + "group_id": row[12], } def get_entity_relations(self, entity_id: str, direction: str = "both") -> List[Dict[str, Any]]: @@ -2414,7 +2546,8 @@ def get_entity_relations(self, entity_id: str, direction: str = "both") -> List[ cursor.execute( """ SELECT r.id, r.source_id, r.target_id, r.relation_type, r.properties, r.confidence, - e.name as target_name, e.entity_type as target_type + e.name as target_name, e.entity_type as target_type, + r.fact, r.importance, r.valid_from, r.valid_until, r.expired_at, r.source_chunk_id FROM kg_relations r JOIN kg_entities e ON r.target_id = e.id WHERE r.source_id = ? @@ -2433,6 +2566,12 @@ def get_entity_relations(self, entity_id: str, direction: str = "both") -> List[ "confidence": row[5], "target_name": row[6], "target_type": row[7], + "fact": row[8], + "importance": row[9], + "valid_from": row[10], + "valid_until": row[11], + "expired_at": row[12], + "source_chunk_id": row[13], "direction": "outgoing", } ) @@ -2442,7 +2581,8 @@ def get_entity_relations(self, entity_id: str, direction: str = "both") -> List[ cursor.execute( """ SELECT r.id, r.source_id, r.target_id, r.relation_type, r.properties, r.confidence, - e.name as source_name, e.entity_type as source_type + e.name as source_name, e.entity_type as source_type, + r.fact, r.importance, r.valid_from, r.valid_until, r.expired_at, r.source_chunk_id FROM kg_relations r JOIN kg_entities e ON r.source_id = e.id WHERE r.target_id = ? @@ -2461,6 +2601,12 @@ def get_entity_relations(self, entity_id: str, direction: str = "both") -> List[ "confidence": row[5], "source_name": row[6], "source_type": row[7], + "fact": row[8], + "importance": row[9], + "valid_from": row[10], + "valid_until": row[11], + "expired_at": row[12], + "source_chunk_id": row[13], "direction": "incoming", } ) @@ -2473,7 +2619,7 @@ def get_entity_chunks(self, entity_id: str, limit: int = 20) -> List[Dict[str, A rows = list( cursor.execute( """ - SELECT ec.chunk_id, ec.relevance, ec.context, + SELECT ec.chunk_id, ec.relevance, ec.context, ec.mention_type, c.content, c.source_file, c.project, c.content_type, c.created_at FROM kg_entity_chunks ec JOIN chunks c ON ec.chunk_id = c.id @@ -2489,11 +2635,12 @@ def get_entity_chunks(self, entity_id: str, limit: int = 20) -> List[Dict[str, A "chunk_id": row[0], "relevance": row[1], "context": row[2], - "content": row[3], - "source_file": row[4], - "project": row[5], - "content_type": row[6], - "created_at": row[7], + "mention_type": row[3], + "content": row[4], + "source_file": row[5], + "project": row[6], + "content_type": row[7], + "created_at": row[8], } for row in rows ] @@ -2683,6 +2830,230 @@ def get_entity_aliases(self, entity_id: str) -> List[Dict[str, Any]]: ) return [{"alias": row[0], "alias_type": row[1], "created_at": row[2]} for row in rows] + # ── KG Standard Methods (matches Convex kgSpec.ts) ────────────────── + + def soft_close_relation(self, relation_id: str) -> None: + """Soft-close a relation by setting expired_at to now. + + The relation is not deleted but becomes invisible to kg_current_facts. + """ + cursor = self.conn.cursor() + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + cursor.execute( + "UPDATE kg_relations SET expired_at = ? WHERE id = ?", + (now, relation_id), + ) + + def get_current_facts( + self, + entity_id: str, + relation_type: Optional[str] = None, + ) -> List[Dict[str, Any]]: + """Get non-expired relations for an entity (outgoing only). + + Uses the kg_current_facts VIEW which auto-filters by validity and expiration. + """ + cursor = self._read_cursor() + if relation_type: + rows = list( + cursor.execute( + """ + SELECT f.id, f.source_id, f.target_id, f.relation_type, f.properties, f.confidence, + f.fact, f.importance, f.valid_from, f.valid_until, f.expired_at, f.source_chunk_id, + e.name as target_name, e.entity_type as target_type + FROM kg_current_facts f + JOIN kg_entities e ON f.target_id = e.id + WHERE f.source_id = ? AND f.relation_type = ? + """, + (entity_id, relation_type), + ) + ) + else: + rows = list( + cursor.execute( + """ + SELECT f.id, f.source_id, f.target_id, f.relation_type, f.properties, f.confidence, + f.fact, f.importance, f.valid_from, f.valid_until, f.expired_at, f.source_chunk_id, + e.name as target_name, e.entity_type as target_type + FROM kg_current_facts f + JOIN kg_entities e ON f.target_id = e.id + WHERE f.source_id = ? + """, + (entity_id,), + ) + ) + return [ + { + "id": row[0], + "source_id": row[1], + "target_id": row[2], + "relation_type": row[3], + "properties": json.loads(row[4]) if row[4] else {}, + "confidence": row[5], + "fact": row[6], + "importance": row[7], + "valid_from": row[8], + "valid_until": row[9], + "expired_at": row[10], + "source_chunk_id": row[11], + "target_name": row[12], + "target_type": row[13], + } + for row in rows + ] + + def traverse( + self, + entity_id: str, + max_depth: int = 2, + relation_types: Optional[List[str]] = None, + ) -> List[Dict[str, Any]]: + """Multi-hop graph traversal via recursive CTE. + + Returns entities reachable from entity_id within max_depth hops, + following non-expired relations. Avoids cycles. + """ + cursor = self._read_cursor() + + rel_filter = "" + params: list = [entity_id, max_depth] + if relation_types: + placeholders = ", ".join("?" for _ in relation_types) + rel_filter = f"AND r.relation_type IN ({placeholders})" + params = [entity_id] + relation_types + [max_depth] + + # Build the recursive CTE query + if relation_types: + query = f""" + WITH RECURSIVE reachable(entity_id, depth, path) AS ( + SELECT ?, 0, '|' || ? || '|' + UNION ALL + SELECT r.target_id, re.depth + 1, + re.path || r.target_id || '|' + FROM reachable re + JOIN kg_relations r ON r.source_id = re.entity_id + WHERE re.depth < ? + AND r.expired_at IS NULL + AND re.path NOT LIKE '%|' || r.target_id || '|%' + {rel_filter} + ) + SELECT DISTINCT r.entity_id, r.depth, e.entity_type, e.name + FROM reachable r + JOIN kg_entities e ON r.entity_id = e.id + WHERE r.depth > 0 + ORDER BY r.depth, e.name + """ + params_list = [entity_id, entity_id] + relation_types + [max_depth] + else: + query = """ + WITH RECURSIVE reachable(entity_id, depth, path) AS ( + SELECT ?, 0, '|' || ? || '|' + UNION ALL + SELECT r.target_id, re.depth + 1, + re.path || r.target_id || '|' + FROM reachable re + JOIN kg_relations r ON r.source_id = re.entity_id + WHERE re.depth < ? + AND r.expired_at IS NULL + AND re.path NOT LIKE '%|' || r.target_id || '|%' + ) + SELECT DISTINCT r.entity_id, r.depth, e.entity_type, e.name + FROM reachable r + JOIN kg_entities e ON r.entity_id = e.id + WHERE r.depth > 0 + ORDER BY r.depth, e.name + """ + params_list = [entity_id, entity_id, max_depth] + + rows = list(cursor.execute(query, params_list)) + return [ + { + "entity_id": row[0], + "depth": row[1], + "entity_type": row[2], + "name": row[3], + } + for row in rows + ] + + def resolve_entity(self, name_or_alias: str) -> Optional[Dict[str, Any]]: + """Resolve a string to a KG entity. + + Resolution order: + 1. Exact alias match (case-insensitive) + 2. Exact name match (case-insensitive) + 3. Canonical name match + 4. FTS5 fuzzy search (first result) + """ + # 1. Exact alias + result = self.get_entity_by_alias(name_or_alias) + if result: + return result + + # 2. Exact name (case-insensitive) + cursor = self._read_cursor() + rows = list( + cursor.execute( + """SELECT id, entity_type, name, metadata, created_at, updated_at, + canonical_name, description, confidence, importance, + valid_from, valid_until, group_id + FROM kg_entities WHERE LOWER(name) = LOWER(?)""", + (name_or_alias,), + ) + ) + if rows: + row = rows[0] + return { + "id": row[0], + "entity_type": row[1], + "name": row[2], + "metadata": json.loads(row[3]) if row[3] else {}, + "created_at": row[4], + "updated_at": row[5], + "canonical_name": row[6], + "description": row[7], + "confidence": row[8], + "importance": row[9], + "valid_from": row[10], + "valid_until": row[11], + "group_id": row[12], + } + + # 3. Canonical name match + rows = list( + cursor.execute( + """SELECT id, entity_type, name, metadata, created_at, updated_at, + canonical_name, description, confidence, importance, + valid_from, valid_until, group_id + FROM kg_entities WHERE LOWER(canonical_name) = LOWER(?)""", + (name_or_alias,), + ) + ) + if rows: + row = rows[0] + return { + "id": row[0], + "entity_type": row[1], + "name": row[2], + "metadata": json.loads(row[3]) if row[3] else {}, + "created_at": row[4], + "updated_at": row[5], + "canonical_name": row[6], + "description": row[7], + "confidence": row[8], + "importance": row[9], + "valid_from": row[10], + "valid_until": row[11], + "group_id": row[12], + } + + # 4. FTS5 fuzzy fallback + results = self.search_entities(name_or_alias, limit=1) + if results: + return self.get_entity(results[0]["id"]) + + return None + def close(self) -> None: """Close database connections.""" if hasattr(self, "read_conn"): diff --git a/tests/test_kg_schema.py b/tests/test_kg_schema.py index dc6cc99..88c1fdc 100644 --- a/tests/test_kg_schema.py +++ b/tests/test_kg_schema.py @@ -74,26 +74,27 @@ def test_kg_entities_fts_virtual_table_exists(self, store): def test_kg_entities_columns(self, store): cursor = store._read_cursor() cols = {row[1] for row in cursor.execute("PRAGMA table_info(kg_entities)")} - assert cols == {"id", "entity_type", "name", "metadata", "created_at", "updated_at", "user_verified"} + expected = { + "id", "entity_type", "name", "metadata", "created_at", "updated_at", "user_verified", + "canonical_name", "description", "confidence", "importance", + "valid_from", "valid_until", "group_id", + } + assert cols == expected def test_kg_relations_columns(self, store): cursor = store._read_cursor() cols = {row[1] for row in cursor.execute("PRAGMA table_info(kg_relations)")} - assert cols == { - "id", - "source_id", - "target_id", - "relation_type", - "properties", - "confidence", - "created_at", - "user_verified", + expected = { + "id", "source_id", "target_id", "relation_type", "properties", + "confidence", "created_at", "user_verified", + "fact", "importance", "valid_from", "valid_until", "expired_at", "source_chunk_id", } + assert cols == expected def test_kg_entity_chunks_columns(self, store): cursor = store._read_cursor() cols = {row[1] for row in cursor.execute("PRAGMA table_info(kg_entity_chunks)")} - assert cols == {"entity_id", "chunk_id", "relevance", "context"} + assert cols == {"entity_id", "chunk_id", "relevance", "context", "mention_type"} def test_source_project_id_column_on_chunks(self, store): cursor = store._read_cursor() diff --git a/tests/test_kg_standard.py b/tests/test_kg_standard.py new file mode 100644 index 0000000..af7701d --- /dev/null +++ b/tests/test_kg_standard.py @@ -0,0 +1,473 @@ +"""Tests for KG Standard Tables — standardized spec matching Convex kgSpec.ts. + +Tests cover: +1. New columns on kg_entities (canonical_name, description, confidence, importance, valid_from/until, group_id) +2. New columns on kg_relations (fact, importance, valid_from/until, expired_at, source_chunk_id) +3. New column on kg_entity_chunks (mention_type) +4. kg_current_facts VIEW +5. Soft-close relations (expired_at) +6. 2-hop graph traversal via recursive CTE +7. Entity resolution (exact alias → fuzzy name) +8. effective_score decay function +9. Shared constants match spec +10. Backward compatibility — existing CRUD still works +""" + +import pytest + +from brainlayer.kg import DECAY_CONSTANTS, ENTITY_TYPES, RELATION_TYPES, effective_score +from brainlayer.vector_store import VectorStore + +# ── Fixtures ──────────────────────────────────────────────────── + + +@pytest.fixture +def store(tmp_path): + """Create a fresh VectorStore for testing.""" + db_path = tmp_path / "test.db" + s = VectorStore(db_path) + yield s + s.close() + + +@pytest.fixture +def populated_store(store): + """Store with a small graph: 3 entities, 2 relations.""" + store.upsert_entity( + "person-1", + "person", + "Etan Heyman", + canonical_name="etan_heyman", + description="Software developer", + confidence=0.95, + importance=0.8, + group_id="default", + ) + store.upsert_entity( + "org-1", + "organization", + "Cantaloupe", + canonical_name="cantaloupe", + description="Tech company", + ) + store.upsert_entity( + "meeting-1", + "meeting", + "Weekly Standup", + canonical_name="weekly_standup", + valid_from="2026-02-27T09:00:00Z", + valid_until="2026-02-27T10:00:00Z", + ) + store.add_relation( + "rel-1", + "person-1", + "org-1", + "works_at", + fact="Etan works at Cantaloupe", + importance=0.9, + ) + store.add_relation( + "rel-2", + "person-1", + "meeting-1", + "attended", + fact="Etan attended the weekly standup", + source_chunk_id="chunk-abc", + ) + return store + + +# ── Constants Tests ──────────────────────────────────────── + + +class TestKGConstants: + """Verify shared constants match the spec.""" + + def test_entity_types_complete(self): + assert len(ENTITY_TYPES) == 7 + assert "person" in ENTITY_TYPES + assert "constraint" in ENTITY_TYPES + assert "preference" in ENTITY_TYPES + assert "life_event" in ENTITY_TYPES + assert "meeting" in ENTITY_TYPES + assert "location" in ENTITY_TYPES + assert "organization" in ENTITY_TYPES + + def test_relation_types_complete(self): + assert len(RELATION_TYPES) == 9 + assert "has_constraint" in RELATION_TYPES + assert "knows" in RELATION_TYPES + assert "supersedes" in RELATION_TYPES + + def test_decay_constants_keys(self): + assert "constraint" in DECAY_CONSTANTS + assert "preference" in DECAY_CONSTANTS + assert "life_event" in DECAY_CONSTANTS + assert "casual" in DECAY_CONSTANTS + assert "meeting" in DECAY_CONSTANTS + + def test_life_event_no_decay(self): + assert DECAY_CONSTANTS["life_event"] == 0 + + +# ── effective_score Tests ──────────────────────────────────── + + +class TestEffectiveScore: + """Test time-decayed scoring function.""" + + def test_perfect_score_at_zero_age(self): + score = effective_score(1.0, 1.0, 0, "constraint") + assert score == pytest.approx(1.0) + + def test_half_life_constraint(self): + # constraint half-life ~365 days: at 365 days, score ≈ 0.5 + score = effective_score(1.0, 1.0, 365, "constraint") + assert 0.45 < score < 0.55 + + def test_half_life_casual(self): + # casual half-life ~30 days: at 30 days, score ≈ 0.5 + score = effective_score(1.0, 1.0, 30, "casual") + assert 0.45 < score < 0.55 + + def test_life_event_no_decay(self): + score = effective_score(1.0, 1.0, 10000, "life_event") + assert score == pytest.approx(1.0) + + def test_confidence_importance_multiply(self): + score = effective_score(0.5, 0.5, 0, "constraint") + assert score == pytest.approx(0.25) + + def test_unknown_type_uses_default(self): + # Unknown type defaults to preference rate (0.0077) + score = effective_score(1.0, 1.0, 90, None) + assert 0.45 < score < 0.55 + + +# ── New Column Tests ──────────────────────────────────────── + + +class TestKGStandardColumns: + """Verify new standard columns exist on KG tables.""" + + def test_kg_entities_new_columns(self, store): + cursor = store._read_cursor() + cols = {row[1] for row in cursor.execute("PRAGMA table_info(kg_entities)")} + for col in ( + "canonical_name", + "description", + "confidence", + "importance", + "valid_from", + "valid_until", + "group_id", + ): + assert col in cols, f"Missing column: {col}" + + def test_kg_relations_new_columns(self, store): + cursor = store._read_cursor() + cols = {row[1] for row in cursor.execute("PRAGMA table_info(kg_relations)")} + for col in ("fact", "importance", "valid_from", "valid_until", "expired_at", "source_chunk_id"): + assert col in cols, f"Missing column: {col}" + + def test_kg_entity_chunks_mention_type(self, store): + cursor = store._read_cursor() + cols = {row[1] for row in cursor.execute("PRAGMA table_info(kg_entity_chunks)")} + assert "mention_type" in cols + + def test_kg_current_facts_view_exists(self, store): + cursor = store._read_cursor() + views = {row[0] for row in cursor.execute("SELECT name FROM sqlite_master WHERE type='view'")} + assert "kg_current_facts" in views + + def test_canonical_name_index(self, store): + cursor = store._read_cursor() + indexes = {row[1] for row in cursor.execute("PRAGMA index_list(kg_entities)")} + assert "idx_kg_entities_canonical" in indexes + + def test_validity_index_on_entities(self, store): + cursor = store._read_cursor() + indexes = {row[1] for row in cursor.execute("PRAGMA index_list(kg_entities)")} + assert "idx_kg_entities_valid" in indexes + + def test_validity_index_on_relations(self, store): + cursor = store._read_cursor() + indexes = {row[1] for row in cursor.execute("PRAGMA index_list(kg_relations)")} + assert "idx_kg_relations_validity" in indexes + + +# ── Entity CRUD with Standard Fields ──────────────────────── + + +class TestEntityStandardCRUD: + """Test entity upsert/get with new standard fields.""" + + def test_upsert_with_standard_fields(self, populated_store): + entity = populated_store.get_entity("person-1") + assert entity is not None + assert entity["canonical_name"] == "etan_heyman" + assert entity["description"] == "Software developer" + assert entity["confidence"] == 0.95 + assert entity["importance"] == 0.8 + assert entity["group_id"] == "default" + + def test_upsert_with_validity(self, populated_store): + entity = populated_store.get_entity("meeting-1") + assert entity["valid_from"] == "2026-02-27T09:00:00Z" + assert entity["valid_until"] == "2026-02-27T10:00:00Z" + + def test_upsert_defaults(self, store): + """New fields default to sensible values.""" + store.upsert_entity("e-1", "person", "Test") + entity = store.get_entity("e-1") + assert entity["confidence"] == 1.0 + assert entity["importance"] == 0.5 + assert entity["canonical_name"] == "test" # auto-lowered from name + assert entity["description"] is None + assert entity["valid_from"] is None + assert entity["valid_until"] is None + assert entity["group_id"] is None + + def test_backward_compat_metadata_still_works(self, store): + """Existing callers using metadata= kwarg still work.""" + store.upsert_entity("e-1", "person", "Test", metadata={"key": "value"}) + entity = store.get_entity("e-1") + assert entity["metadata"]["key"] == "value" + + +# ── Relation CRUD with Standard Fields ──────────────────────── + + +class TestRelationStandardCRUD: + """Test relation CRUD with new standard fields.""" + + def test_relation_with_fact(self, populated_store): + rels = populated_store.get_entity_relations("person-1", direction="outgoing") + works_at = [r for r in rels if r["relation_type"] == "works_at"][0] + assert works_at["fact"] == "Etan works at Cantaloupe" + assert works_at["importance"] == 0.9 + + def test_relation_with_source_chunk(self, populated_store): + rels = populated_store.get_entity_relations("person-1", direction="outgoing") + attended = [r for r in rels if r["relation_type"] == "attended"][0] + assert attended["source_chunk_id"] == "chunk-abc" + + def test_relation_defaults(self, store): + store.upsert_entity("e-1", "person", "A") + store.upsert_entity("e-2", "person", "B") + store.add_relation("r-1", "e-1", "e-2", "knows") + rels = store.get_entity_relations("e-1", direction="outgoing") + assert rels[0]["fact"] is None + assert rels[0]["importance"] == 0.5 + assert rels[0]["expired_at"] is None + assert rels[0]["source_chunk_id"] is None + + +# ── Soft-Close Relations ──────────────────────────────────── + + +class TestSoftCloseRelation: + """Test soft-closing (expiring) relations.""" + + def test_soft_close_sets_expired_at(self, populated_store): + populated_store.soft_close_relation("rel-1") + rels = populated_store.get_entity_relations("person-1", direction="outgoing") + works_at = [r for r in rels if r["relation_type"] == "works_at"][0] + assert works_at["expired_at"] is not None + + def test_current_facts_excludes_expired(self, populated_store): + # Both relations visible before soft-close + facts = populated_store.get_current_facts("person-1") + assert len(facts) == 2 + + # Soft-close one + populated_store.soft_close_relation("rel-1") + + # Only non-expired visible + facts = populated_store.get_current_facts("person-1") + assert len(facts) == 1 + assert facts[0]["relation_type"] == "attended" + + def test_current_facts_view_filters_expired(self, populated_store): + populated_store.soft_close_relation("rel-1") + cursor = populated_store._read_cursor() + rows = list(cursor.execute("SELECT * FROM kg_current_facts WHERE source_id = ?", ("person-1",))) + assert len(rows) == 1 + + +# ── 2-Hop Traversal ──────────────────────────────────────── + + +class TestGraphTraversal: + """Test multi-hop graph traversal via recursive CTE.""" + + def test_1hop_traversal(self, populated_store): + result = populated_store.traverse(entity_id="person-1", max_depth=1) + # person-1 → org-1, person-1 → meeting-1 + entity_ids = {r["entity_id"] for r in result} + assert "org-1" in entity_ids + assert "meeting-1" in entity_ids + + def test_2hop_traversal(self, store): + # Build A → B → C chain + store.upsert_entity("a", "person", "Alice") + store.upsert_entity("b", "person", "Bob") + store.upsert_entity("c", "person", "Carol") + store.add_relation("r1", "a", "b", "knows") + store.add_relation("r2", "b", "c", "knows") + + result = store.traverse(entity_id="a", max_depth=2) + entity_ids = {r["entity_id"] for r in result} + assert "b" in entity_ids # 1 hop + assert "c" in entity_ids # 2 hops + + def test_traversal_depth_limit(self, store): + # A → B → C, but max_depth=1 should not reach C + store.upsert_entity("a", "person", "Alice") + store.upsert_entity("b", "person", "Bob") + store.upsert_entity("c", "person", "Carol") + store.add_relation("r1", "a", "b", "knows") + store.add_relation("r2", "b", "c", "knows") + + result = store.traverse(entity_id="a", max_depth=1) + entity_ids = {r["entity_id"] for r in result} + assert "b" in entity_ids + assert "c" not in entity_ids + + def test_traversal_no_cycles(self, store): + # A ↔ B (bidirectional) should not loop + store.upsert_entity("a", "person", "Alice") + store.upsert_entity("b", "person", "Bob") + store.add_relation("r1", "a", "b", "knows") + store.add_relation("r2", "b", "a", "knows") + + result = store.traverse(entity_id="a", max_depth=3) + assert len(result) == 1 # Only b reachable + + +# ── Entity Resolution ──────────────────────────────────────── + + +class TestEntityResolution: + """Test entity resolution: exact alias → fuzzy name.""" + + def test_resolve_by_exact_alias(self, store): + store.upsert_entity("person-1", "person", "Etan Heyman") + store.add_entity_alias("EtanHey", "person-1", alias_type="nickname") + + result = store.resolve_entity("EtanHey") + assert result is not None + assert result["id"] == "person-1" + + def test_resolve_by_exact_name(self, store): + store.upsert_entity("person-1", "person", "Etan Heyman") + + result = store.resolve_entity("Etan Heyman") + assert result is not None + assert result["id"] == "person-1" + + def test_resolve_by_fuzzy_name(self, store): + store.upsert_entity("person-1", "person", "Etan Heyman") + + result = store.resolve_entity("Etan") + assert result is not None + assert result["id"] == "person-1" + + def test_resolve_not_found(self, store): + result = store.resolve_entity("nonexistent_xyzzy_12345") + assert result is None + + def test_resolve_by_canonical_name(self, store): + store.upsert_entity( + "person-1", + "person", + "Etan Heyman", + canonical_name="etan_heyman", + ) + result = store.resolve_entity("etan_heyman") + assert result is not None + assert result["id"] == "person-1" + + +# ── Link Entity Chunk with mention_type ──────────────────── + + +class TestEntityChunkMentionType: + """Test linking entities to chunks with mention_type.""" + + def test_link_with_mention_type(self, store): + store.upsert_entity("e-1", "person", "Test") + # We need a chunk to link to — use a minimal insert + cursor = store.conn.cursor() + cursor.execute( + """INSERT INTO chunks (id, content, metadata, source_file, project, content_type, value_type, char_count) + VALUES (?, ?, '{}', 'test.jsonl', 'test', 'user_message', 'HIGH', 10)""", + ("chunk-1", "test content"), + ) + emb = [0.1] * 1024 + from brainlayer.vector_store import serialize_f32 + + cursor.execute( + "INSERT INTO chunk_vectors (chunk_id, embedding) VALUES (?, ?)", + ("chunk-1", serialize_f32(emb)), + ) + + store.link_entity_chunk("e-1", "chunk-1", relevance=0.9, mention_type="explicit") + + chunks = store.get_entity_chunks("e-1") + assert chunks[0]["mention_type"] == "explicit" + + def test_link_default_mention_type(self, store): + store.upsert_entity("e-1", "person", "Test") + cursor = store.conn.cursor() + cursor.execute( + """INSERT INTO chunks (id, content, metadata, source_file, project, content_type, value_type, char_count) + VALUES (?, ?, '{}', 'test.jsonl', 'test', 'user_message', 'HIGH', 10)""", + ("chunk-1", "test content"), + ) + emb = [0.1] * 1024 + from brainlayer.vector_store import serialize_f32 + + cursor.execute( + "INSERT INTO chunk_vectors (chunk_id, embedding) VALUES (?, ?)", + ("chunk-1", serialize_f32(emb)), + ) + + store.link_entity_chunk("e-1", "chunk-1") + chunks = store.get_entity_chunks("e-1") + assert chunks[0]["mention_type"] is None # Not set by default + + +# ── Migration Backward Compatibility ──────────────────────── + + +class TestBackwardCompatibility: + """Verify existing schema + data survives the migration.""" + + def test_existing_kg_tests_entity_types_still_work(self, store): + """The old entity types (golem, skill, project, topic) still work.""" + store.upsert_entity("golem-1", "golem", "brainClaude") + store.upsert_entity("skill-1", "skill", "railway-deploy") + entity = store.get_entity("golem-1") + assert entity is not None + assert entity["entity_type"] == "golem" + + def test_existing_relation_type_field(self, store): + """relation_type column still works (spec calls it 'relation').""" + store.upsert_entity("a", "person", "A") + store.upsert_entity("b", "person", "B") + store.add_relation("r1", "a", "b", "knows") + rels = store.get_entity_relations("a", direction="outgoing") + assert rels[0]["relation_type"] == "knows" + + def test_double_init_with_new_columns(self, tmp_path): + """Opening same DB twice doesn't crash with new columns.""" + db_path = tmp_path / "test.db" + s1 = VectorStore(db_path) + s1.upsert_entity("e-1", "person", "Test", confidence=0.9) + s1.close() + + s2 = VectorStore(db_path) + entity = s2.get_entity("e-1") + assert entity["confidence"] == 0.9 + s2.close()