diff --git a/src/brainlayer/mcp/__init__.py b/src/brainlayer/mcp/__init__.py index 1945fd6..629c0d0 100644 --- a/src/brainlayer/mcp/__init__.py +++ b/src/brainlayer/mcp/__init__.py @@ -47,7 +47,7 @@ async def _with_timeout(coro, timeout: float = MCP_QUERY_TIMEOUT): server = Server( "brainlayer", instructions=( - "Memory layer for Claude Code. 5 tools:\n" + "Memory layer for Claude Code. 6 tools:\n" "- brain_search(query): semantic search across 268K+ indexed conversation chunks. " "Filters: project, file_path, chunk_id, content_type, tag, intent, importance_min. " "Routing is automatic — pass file_path for file history, chunk_id to expand context, no args for current work.\n" @@ -638,6 +638,10 @@ async def list_tools() -> list[Tool]: "default": 10, "description": "Maximum results for think/recall modes (default: 10)", }, + "entity_id": { + "type": "string", + "description": "Filter results to chunks linked to this entity ID. Used for per-person memory scoping (e.g., get only memories about a specific person). Bypasses routing rules.", + }, }, "required": ["query"], }, @@ -701,11 +705,50 @@ async def list_tools() -> list[Tool]: "items": {"type": "string"}, "description": "Files affected by this decision.", }, + "entity_id": { + "type": "string", + "description": "Link this memory to an entity (e.g., a person). The stored chunk will be linked via kg_entity_chunks for per-person memory retrieval.", + }, }, "required": ["content"], }, outputSchema=_STORE_OUTPUT_SCHEMA, ), + Tool( + name="brain_get_person", + title="Get Person Context", + description="""Composite tool: look up a person entity and retrieve their scoped memories in one call. + +Returns the person's profile (hard_constraints, preferences, contact_info), +their relations in the knowledge graph, and relevant memory chunks linked to them. + +If 'context' is provided, memories are ranked by semantic relevance to the context. +Otherwise, memories are ordered by their entity-chunk relevance score. + +Designed for copilot agents that need full person context in a single call.""", + annotations=_READ_ONLY, + inputSchema={ + "type": "object", + "properties": { + "name": { + "type": "string", + "description": "Person name to look up (e.g., 'Avi Simon'). Searches by FTS + semantic match.", + }, + "context": { + "type": "string", + "description": "Optional meeting/conversation context to rank memories by relevance (e.g., 'schedule a meeting next week about product roadmap').", + }, + "num_memories": { + "type": "integer", + "default": 10, + "minimum": 1, + "maximum": 50, + "description": "Number of memory chunks to return (default: 10).", + }, + }, + "required": ["name"], + }, + ), Tool( name="brain_recall", title="Recall Context", @@ -906,6 +949,7 @@ async def call_tool(name: str, arguments: dict[str, Any]): date_from=arguments.get("date_from"), date_to=arguments.get("date_to"), sentiment=arguments.get("sentiment"), + entity_id=arguments.get("entity_id"), num_results=arguments.get("num_results", 5), before=max(0, min(arguments.get("before", 3), 50)), after=max(0, min(arguments.get("after", 3), 50)), @@ -927,6 +971,16 @@ async def call_tool(name: str, arguments: dict[str, Any]): outcome=arguments.get("outcome"), reversibility=arguments.get("reversibility"), files_changed=arguments.get("files_changed"), + entity_id=arguments.get("entity_id"), + ) + + elif name == "brain_get_person": + return await _with_timeout( + _brain_get_person( + name=arguments["name"], + context=arguments.get("context"), + num_memories=arguments.get("num_memories", 10), + ) ) elif name == "brain_recall": @@ -1141,6 +1195,105 @@ async def _brain_entity( return CallToolResult(content=[TextContent(type="text", text=json.dumps(result, indent=2))]) +async def _brain_get_person( + name: str, + context: str | None = None, + num_memories: int = 10, +) -> CallToolResult: + """Composite tool: look up a person entity + retrieve their scoped memories. + + Returns structured JSON with: + - profile: entity metadata (constraints, preferences, contact info) + - relations: entity relations from KG + - memories: relevant memory chunks linked to this person + """ + import json + + from ..pipeline.digest import entity_lookup + + store = _get_vector_store() + model = _get_embedding_model() + loop = asyncio.get_event_loop() + + # Step 1: Look up the person entity + try: + entity = await loop.run_in_executor( + None, + lambda: entity_lookup( + query=name, + store=store, + embed_fn=model.embed_query, + entity_type="person", + ), + ) + except Exception as e: + return _error_result(f"Person lookup failed: {e}") + + if entity is None: + return CallToolResult(content=[TextContent(type="text", text=f"No person entity found matching '{name}'.")]) + + entity_id = entity["id"] + + # Step 2: Get per-person scoped memories + memories = [] + try: + if context: + # If context provided, do semantic search scoped to this person's chunks + query_embedding = await loop.run_in_executor(None, model.embed_query, context) + results = await loop.run_in_executor( + None, + lambda: store.hybrid_search( + query_embedding=query_embedding, + query_text=context, + n_results=num_memories, + entity_id=entity_id, + ), + ) + if results["documents"][0]: + for doc, meta in zip(results["documents"][0], results["metadatas"][0]): + memories.append( + { + "content": doc[:500], + "type": meta.get("content_type", "unknown"), + "date": meta.get("created_at", "")[:10] if meta.get("created_at") else None, + "summary": meta.get("summary"), + } + ) + else: + # No context: return entity's linked chunks ordered by relevance + entity_chunks = await loop.run_in_executor( + None, + lambda: store.get_entity_chunks(entity_id, limit=num_memories), + ) + for chunk in entity_chunks: + memories.append( + { + "content": chunk["content"][:500] if chunk.get("content") else "", + "type": chunk.get("content_type", "unknown"), + "date": chunk.get("created_at", "")[:10] if chunk.get("created_at") else None, + "relevance": chunk.get("relevance"), + } + ) + except Exception as e: + logger.warning("Memory retrieval for person '%s' failed: %s", name, e) + + # Step 3: Build composite result + metadata = entity.get("metadata", {}) + result = { + "entity_id": entity_id, + "name": entity["name"], + "profile": metadata, + "hard_constraints": metadata.get("hard_constraints", {}), + "preferences": metadata.get("preferences", {}), + "contact_info": metadata.get("contact_info", {}), + "relations": entity.get("relations", []), + "memories": memories, + "memory_count": len(memories), + } + + return CallToolResult(content=[TextContent(type="text", text=json.dumps(result, indent=2))]) + + # --- Consolidated Dispatchers (Phase 4) --- @@ -1157,17 +1310,25 @@ async def _brain_search( date_from: str | None = None, date_to: str | None = None, sentiment: str | None = None, + entity_id: str | None = None, num_results: int = 5, before: int = 3, after: int = 3, max_results: int = 10, ): - """Unified search dispatcher — routes to the right internal handler.""" + """Unified search dispatcher — routes to the right internal handler. + + Args: + entity_id: If provided, only return chunks linked to this entity. + Bypasses most routing rules and goes straight to hybrid search + with entity scoping. Used for per-person memory retrieval. + """ # Auto-scope project from CWD if not provided — but ONLY for claude_code source. # Non-claude_code sources (youtube, whatsapp, etc.) have null/different project values, # so auto-scoping filters them out entirely (bug: brain_search(source="youtube") → 0 results). - if project is None and source not in ("youtube", "whatsapp", "telegram", "all"): + # Also skip auto-scope when entity_id is set (entity-scoped search is cross-project). + if project is None and entity_id is None and source not in ("youtube", "whatsapp", "telegram", "all"): try: from ..scoping import resolve_project_scope @@ -1175,6 +1336,23 @@ async def _brain_search( except Exception: pass # Scoping failure should never block search + # Entity-scoped search: skip routing rules, go straight to hybrid search + if entity_id is not None: + return await _search( + query=query, + project=project, + content_type=content_type, + num_results=num_results, + source=source, + tag=tag, + intent=intent, + importance_min=importance_min, + date_from=date_from, + date_to=date_to, + sentiment=sentiment, + entity_id=entity_id, + ) + # Rule 1: chunk context expand if chunk_id is not None: return await _context(chunk_id=chunk_id, before=before, after=after) @@ -1345,6 +1523,7 @@ async def _store_new( outcome: str | None = None, reversibility: str | None = None, files_changed: list[str] | None = None, + entity_id: str | None = None, ): """Wrapper for _store with auto-type detection and auto-importance.""" resolved_type = memory_type or _detect_memory_type(content) @@ -1359,6 +1538,7 @@ async def _store_new( outcome=outcome, reversibility=reversibility, files_changed=files_changed, + entity_id=entity_id, ) @@ -1377,6 +1557,7 @@ async def _search( date_from: str | None = None, date_to: str | None = None, sentiment: str | None = None, + entity_id: str | None = None, ): """Execute a hybrid search query (semantic + keyword via RRF).""" try: @@ -1410,6 +1591,11 @@ async def _search( else: source_filter = "claude_code" + # When searching by entity_id, skip source_filter default (entity memories + # may come from any source: manual, digest, claude_code, etc.) + if entity_id and not source: + source_filter = None + # Use hybrid search (semantic + FTS5 keyword via RRF) results = store.hybrid_search( query_embedding=query_embedding, @@ -1424,6 +1610,7 @@ async def _search( date_from=date_from, date_to=date_to, sentiment_filter=sentiment, + entity_id=entity_id, ) if not results["documents"][0]: @@ -2057,6 +2244,7 @@ def _flush_pending_stores(store, embed_fn) -> int: outcome=item.get("outcome"), reversibility=item.get("reversibility"), files_changed=item.get("files_changed"), + entity_id=item.get("entity_id"), ) flushed += 1 except Exception: @@ -2081,6 +2269,7 @@ async def _store( outcome: str | None = None, reversibility: str | None = None, files_changed: list[str] | None = None, + entity_id: str | None = None, ): """Store a memory into BrainLayer. Buffers to JSONL on DB lock.""" try: @@ -2109,6 +2298,7 @@ def _embed(text: str) -> list[float]: outcome=outcome, reversibility=reversibility, files_changed=files_changed, + entity_id=entity_id, ), ) @@ -2151,6 +2341,7 @@ def _embed(text: str) -> list[float]: "outcome": outcome, "reversibility": reversibility, "files_changed": files_changed, + "entity_id": entity_id, } ) structured = {"chunk_id": "queued", "related": []} diff --git a/src/brainlayer/store.py b/src/brainlayer/store.py index 2f31b00..458ed78 100644 --- a/src/brainlayer/store.py +++ b/src/brainlayer/store.py @@ -42,6 +42,7 @@ def store_memory( outcome: Optional[str] = None, reversibility: Optional[str] = None, files_changed: Optional[List[str]] = None, + entity_id: Optional[str] = None, ) -> Dict[str, Any]: """Persistently store a memory into BrainLayer. @@ -57,6 +58,8 @@ def store_memory( outcome: Optional decision outcome (pending/validated/reversed). reversibility: Optional reversibility (easy/hard/destructive). files_changed: Optional list of affected file paths. + entity_id: Optional entity ID to link this memory to via kg_entity_chunks. + Used for per-person memory tagging. Returns: Dict with 'id' (chunk ID) and 'related' (list of similar existing memories). @@ -136,6 +139,19 @@ def store_memory( # (The trigger handles this for INSERT INTO chunks, but since we bypass # the normal upsert_chunks flow, verify it's there) + # Link to entity if entity_id provided (per-person memory tagging) + if entity_id: + # Validate entity exists to avoid dangling kg_entity_chunks rows + entity = store.get_entity(entity_id) + if entity is None: + raise ValueError(f"Unknown entity_id: {entity_id}") + store.link_entity_chunk( + entity_id=entity_id, + chunk_id=chunk_id, + relevance=1.0, + context=f"Stored via brain_store: {memory_type}", + ) + return { "id": chunk_id, "related": related, diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index 50fa146..396c1e2 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -564,8 +564,14 @@ def search( date_from: Optional[str] = None, date_to: Optional[str] = None, sentiment_filter: Optional[str] = None, + entity_id: Optional[str] = None, ) -> Dict[str, List]: - """Search chunks by embedding or text.""" + """Search chunks by embedding or text. + + Args: + entity_id: If provided, only return chunks linked to this entity + via kg_entity_chunks. Used for per-person memory scoping. + """ cursor = self._read_cursor() @@ -576,6 +582,9 @@ def search( where_clauses = [] filter_params: list = [] + if entity_id: + where_clauses.append("c.id IN (SELECT chunk_id FROM kg_entity_chunks WHERE entity_id = ?)") + filter_params.append(entity_id) if project_filter: where_clauses.append("c.project = ?") filter_params.append(project_filter) @@ -616,8 +625,11 @@ def search( if where_clauses: where_sql = "AND " + " AND ".join(where_clauses) - # sqlite-vec KNN: MATCH and k must bind before filter params - params = [query_bytes, n_results] + filter_params + # sqlite-vec KNN: MATCH and k must bind before filter params. + # When entity_id is set, bump k to over-fetch since entity filter + # is applied post-KNN and most candidates won't match. + effective_k = min(n_results * 10, 1000) if entity_id else n_results + params = [query_bytes, effective_k] + filter_params query = f""" SELECT c.id, c.content, c.metadata, c.source_file, c.project, c.content_type, c.value_type, c.char_count, @@ -637,6 +649,9 @@ def search( where_clauses = ["content LIKE ?"] params = [f"%{query_text}%"] + if entity_id: + where_clauses.append("id IN (SELECT chunk_id FROM kg_entity_chunks WHERE entity_id = ?)") + params.append(entity_id) if project_filter: where_clauses.append("project = ?") params.append(project_filter) @@ -866,9 +881,15 @@ def hybrid_search( date_from: Optional[str] = None, date_to: Optional[str] = None, sentiment_filter: Optional[str] = None, + entity_id: Optional[str] = None, k: int = 60, ) -> Dict[str, List]: - """Hybrid search combining semantic (vector) + keyword (FTS5) via Reciprocal Rank Fusion.""" + """Hybrid search combining semantic (vector) + keyword (FTS5) via Reciprocal Rank Fusion. + + Args: + entity_id: If provided, only return chunks linked to this entity + via kg_entity_chunks. Used for per-person memory scoping. + """ # 1. Semantic search — get more results for fusion (uses _read_cursor via search()) semantic = self.search( @@ -885,6 +906,7 @@ def hybrid_search( date_from=date_from, date_to=date_to, sentiment_filter=sentiment_filter, + entity_id=entity_id, ) # Build semantic rank map: chunk_content -> rank @@ -901,6 +923,11 @@ def hybrid_search( # Wrap each term in double quotes to treat as literal strings. fts_query = _escape_fts5_query(query_text) fts_params: list = [fts_query] + entity_join = "" + if entity_id: + entity_join = "JOIN kg_entity_chunks ec ON c.id = ec.chunk_id" + fts_extra.append("AND ec.entity_id = ?") + fts_params.append(entity_id) if tag_filter: fts_extra.append( "AND c.tags IS NOT NULL AND json_valid(c.tags) = 1 AND EXISTS (SELECT 1 FROM json_each(c.tags) WHERE value = ?)" @@ -933,6 +960,7 @@ def hybrid_search( c.created_at, c.source FROM chunks_fts f JOIN chunks c ON f.chunk_id = c.id + {entity_join} WHERE chunks_fts MATCH ? {" ".join(fts_extra)} ORDER BY f.rank LIMIT ? diff --git a/tests/test_6pm_entity_upgrades.py b/tests/test_6pm_entity_upgrades.py new file mode 100644 index 0000000..bfe82a4 --- /dev/null +++ b/tests/test_6pm_entity_upgrades.py @@ -0,0 +1,505 @@ +"""Tests for 6PM entity upgrades — per-person memory scoping, brain_get_person, entity-tagged store. + +4 features tested: +1. Person profile schema convention on metadata JSON +2. Per-person memory scoping (search(entity_id=...)) +3. brain_get_person composite logic (entity_lookup + scoped memories) +4. Entity-tagged brain_store (store_memory(entity_id=...)) +""" + +import json + +import pytest + +from brainlayer.vector_store import VectorStore, serialize_f32 + +# ── Fixtures ──────────────────────────────────────────────────── + + +@pytest.fixture +def store(tmp_path): + """Create a fresh VectorStore for testing.""" + db_path = tmp_path / "test.db" + s = VectorStore(db_path) + yield s + s.close() + + +@pytest.fixture +def mock_embed(): + """Mock embedding function that returns a deterministic 1024-dim vector.""" + + def _embed(text: str) -> list[float]: + seed = sum(ord(c) for c in text[:50]) % 100 + return [float(seed + i) / 1000.0 for i in range(1024)] + + return _embed + + +@pytest.fixture +def person_entity(store, mock_embed): + """Create a person entity with 6PM-style profile metadata.""" + entity_id = store.upsert_entity( + entity_id="person-avi-simon", + entity_type="person", + name="Avi Simon", + metadata={ + "hard_constraints": { + "blocked_weekdays": ["SAT"], + "not_before": "09:00", + "not_after": "18:00", + }, + "preferences": { + "preferred_time_of_day": "MORNING", + "duration_minutes": 30, + }, + "contact_info": { + "email": "avi@6pm.ai", + "phone": "+972-54-1234567", + }, + }, + ) + return entity_id + + +@pytest.fixture +def person_with_chunks(store, mock_embed, person_entity): + """Create a person entity with linked chunks (messages/memories).""" + entity_id = person_entity + + # Create some chunks and link them to the person + chunks = [ + ("Mondays are impossible for me, I have team standup all morning", "user_message"), + ("Best time for me is after 15:00 on weekdays", "user_message"), + ("I prefer video calls over phone calls", "user_message"), + ("Meeting with Avi went well, he confirmed Thursday 2pm works", "assistant_text"), + ] + + chunk_ids = [] + now = "2026-02-26T10:00:00Z" + for content, content_type in chunks: + chunk_id = f"test-{len(chunk_ids)}" + embedding = mock_embed(content) + cursor = store.conn.cursor() + cursor.execute( + """INSERT INTO chunks (id, content, metadata, source_file, project, + content_type, value_type, char_count, source, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + chunk_id, + content, + json.dumps({}), + "test.jsonl", + "6pm-mini", + content_type, + "HIGH", + len(content), + "manual", + now, + ), + ) + cursor.execute( + "INSERT INTO chunk_vectors (chunk_id, embedding) VALUES (?, ?)", + (chunk_id, serialize_f32(embedding)), + ) + store.link_entity_chunk(entity_id, chunk_id, relevance=1.0, context="test message") + chunk_ids.append(chunk_id) + + # Also create an unlinked chunk (should NOT appear in entity-scoped searches) + cursor = store.conn.cursor() + unlinked_embedding = mock_embed("Random unrelated message about weather") + cursor.execute( + """INSERT INTO chunks (id, content, metadata, source_file, project, + content_type, value_type, char_count, source, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + "test-unlinked", + "Random unrelated message about weather", + json.dumps({}), + "test.jsonl", + "6pm-mini", + "user_message", + "HIGH", + 38, + "manual", + now, + ), + ) + cursor.execute( + "INSERT INTO chunk_vectors (chunk_id, embedding) VALUES (?, ?)", + ("test-unlinked", serialize_f32(unlinked_embedding)), + ) + + return entity_id, chunk_ids + + +# ── 1. Person Profile Schema Convention ──────────────────────── + + +class TestPersonProfileSchema: + """Verify the person profile metadata schema convention.""" + + def test_person_entity_stores_hard_constraints(self, store, person_entity): + """Person entity metadata includes hard_constraints.""" + entity = store.get_entity(person_entity) + assert entity is not None + meta = entity["metadata"] + assert "hard_constraints" in meta + assert meta["hard_constraints"]["blocked_weekdays"] == ["SAT"] + assert meta["hard_constraints"]["not_before"] == "09:00" + assert meta["hard_constraints"]["not_after"] == "18:00" + + def test_person_entity_stores_preferences(self, store, person_entity): + """Person entity metadata includes preferences.""" + entity = store.get_entity(person_entity) + meta = entity["metadata"] + assert "preferences" in meta + assert meta["preferences"]["preferred_time_of_day"] == "MORNING" + assert meta["preferences"]["duration_minutes"] == 30 + + def test_person_entity_stores_contact_info(self, store, person_entity): + """Person entity metadata includes contact_info.""" + entity = store.get_entity(person_entity) + meta = entity["metadata"] + assert "contact_info" in meta + assert meta["contact_info"]["email"] == "avi@6pm.ai" + + def test_person_entity_type_is_person(self, store, person_entity): + """Entity type is 'person'.""" + entity = store.get_entity(person_entity) + assert entity["entity_type"] == "person" + assert entity["name"] == "Avi Simon" + + def test_person_entity_upsert_updates_metadata(self, store, person_entity): + """Upserting same entity updates metadata without creating duplicate.""" + # Upsert with updated constraints + updated_id = store.upsert_entity( + entity_id="person-avi-simon", + entity_type="person", + name="Avi Simon", + metadata={ + "hard_constraints": { + "blocked_weekdays": ["SAT", "FRI"], + "not_before": "10:00", + "not_after": "17:00", + }, + "preferences": { + "preferred_time_of_day": "AFTERNOON", + }, + }, + ) + + # Should be the same entity (upsert, not create) + entity = store.get_entity(updated_id) + assert entity["metadata"]["hard_constraints"]["blocked_weekdays"] == ["SAT", "FRI"] + assert entity["metadata"]["preferences"]["preferred_time_of_day"] == "AFTERNOON" + + +# ── 2. Per-Person Memory Scoping ────────────────────────────── + + +class TestPerPersonMemoryScoping: + """Test entity_id filtering in search and hybrid_search.""" + + def test_search_with_entity_id_filters_results(self, store, mock_embed, person_with_chunks): + """search(entity_id=...) only returns chunks linked to that entity.""" + entity_id, chunk_ids = person_with_chunks + embedding = mock_embed("what time works for meeting") + + results = store.search( + query_embedding=embedding, + n_results=20, + entity_id=entity_id, + ) + + # Should only get the 4 linked chunks, not the unlinked one + result_ids = results["ids"][0] + assert "test-unlinked" not in result_ids + assert len(result_ids) <= 4 + + def test_search_without_entity_id_returns_all(self, store, mock_embed, person_with_chunks): + """search() without entity_id returns all chunks (including unlinked).""" + entity_id, chunk_ids = person_with_chunks + embedding = mock_embed("what time works for meeting") + + results = store.search( + query_embedding=embedding, + n_results=20, + ) + + result_ids = results["ids"][0] + # Should include both linked and unlinked chunks + assert len(result_ids) == 5 # 4 linked + 1 unlinked + + def test_hybrid_search_with_entity_id(self, store, mock_embed, person_with_chunks): + """hybrid_search(entity_id=...) scopes both semantic and FTS to entity chunks.""" + entity_id, chunk_ids = person_with_chunks + embedding = mock_embed("meeting schedule") + + results = store.hybrid_search( + query_embedding=embedding, + query_text="meeting", + n_results=20, + entity_id=entity_id, + ) + + result_ids = results["ids"][0] + assert "test-unlinked" not in result_ids + + def test_entity_id_with_no_linked_chunks_returns_empty(self, store, mock_embed): + """Searching with entity_id that has no linked chunks returns empty.""" + # Create entity with no chunks + entity_id = store.upsert_entity( + entity_id="person-nobody", + entity_type="person", + name="Nobody", + metadata={}, + ) + + embedding = mock_embed("test query") + results = store.search( + query_embedding=embedding, + n_results=10, + entity_id=entity_id, + ) + + assert len(results["ids"][0]) == 0 + + def test_text_search_with_entity_id(self, store, mock_embed, person_with_chunks): + """Text search with entity_id only returns entity-linked chunks.""" + entity_id, chunk_ids = person_with_chunks + + results = store.search( + query_text="impossible", + n_results=20, + entity_id=entity_id, + ) + + # "Mondays are impossible" is linked; "Random unrelated" is not + result_ids = results["ids"][0] + assert "test-unlinked" not in result_ids + assert len(result_ids) >= 1 + + +# ── 3. Entity-Tagged Store ──────────────────────────────────── + + +class TestEntityTaggedStore: + """Test store_memory(entity_id=...) auto-linking.""" + + def test_store_with_entity_id_links_chunk(self, store, mock_embed, person_entity): + """store_memory with entity_id links the new chunk to the entity.""" + from brainlayer.store import store_memory + + result = store_memory( + store=store, + embed_fn=mock_embed, + content="Avi mentioned he can't do Mondays at all", + memory_type="note", + entity_id=person_entity, + ) + + chunk_id = result["id"] + + # Verify the chunk is linked to the entity + entity_chunks = store.get_entity_chunks(person_entity) + chunk_ids = [c["chunk_id"] for c in entity_chunks] + assert chunk_id in chunk_ids + + def test_store_without_entity_id_no_link(self, store, mock_embed, person_entity): + """store_memory without entity_id does NOT link to any entity.""" + from brainlayer.store import store_memory + + result = store_memory( + store=store, + embed_fn=mock_embed, + content="General note about something unrelated", + memory_type="note", + ) + + chunk_id = result["id"] + + # Should NOT be linked to the person entity + entity_chunks = store.get_entity_chunks(person_entity) + chunk_ids = [c["chunk_id"] for c in entity_chunks] + assert chunk_id not in chunk_ids + + def test_stored_entity_linked_chunk_is_searchable_by_entity(self, store, mock_embed, person_entity): + """After storing with entity_id, the chunk appears in entity-scoped search.""" + from brainlayer.store import store_memory + + store_memory( + store=store, + embed_fn=mock_embed, + content="Avi prefers Tuesday afternoons for long meetings", + memory_type="note", + entity_id=person_entity, + ) + + # Search scoped to entity + embedding = mock_embed("when does Avi prefer meetings") + results = store.search( + query_embedding=embedding, + n_results=10, + entity_id=person_entity, + ) + + # Should find the stored memory + assert len(results["ids"][0]) >= 1 + found_contents = results["documents"][0] + assert any("Tuesday afternoons" in c for c in found_contents) + + def test_store_entity_link_has_context(self, store, mock_embed, person_entity): + """Entity link from store has descriptive context.""" + from brainlayer.store import store_memory + + store_memory( + store=store, + embed_fn=mock_embed, + content="Test memory", + memory_type="learning", + entity_id=person_entity, + ) + + entity_chunks = store.get_entity_chunks(person_entity) + assert len(entity_chunks) >= 1 + assert "brain_store" in entity_chunks[0]["context"] + + +# ── 4. brain_get_person Composite Logic (Unit-Level) ────────── + + +class TestBrainGetPersonLogic: + """Test the entity_lookup + scoped memories logic used by brain_get_person.""" + + def test_entity_lookup_finds_person(self, store, mock_embed, person_entity): + """entity_lookup finds a person by name.""" + from brainlayer.pipeline.digest import entity_lookup + + # Need to add embedding to the entity for semantic search + entity_embedding = mock_embed("Avi Simon person") + store.conn.cursor().execute( + "INSERT INTO kg_vec_entities (entity_id, embedding) VALUES (?, ?)", + (person_entity, serialize_f32(entity_embedding)), + ) + + result = entity_lookup( + query="Avi Simon", + store=store, + embed_fn=mock_embed, + entity_type="person", + ) + + assert result is not None + assert result["name"] == "Avi Simon" + assert result["entity_type"] == "person" + assert "hard_constraints" in result["metadata"] + + def test_entity_lookup_returns_metadata_with_constraints(self, store, mock_embed, person_entity): + """entity_lookup result includes structured profile fields.""" + from brainlayer.pipeline.digest import entity_lookup + + entity_embedding = mock_embed("Avi Simon person") + store.conn.cursor().execute( + "INSERT INTO kg_vec_entities (entity_id, embedding) VALUES (?, ?)", + (person_entity, serialize_f32(entity_embedding)), + ) + + result = entity_lookup( + query="Avi Simon", + store=store, + embed_fn=mock_embed, + entity_type="person", + ) + + assert result["metadata"]["hard_constraints"]["not_before"] == "09:00" + assert result["metadata"]["preferences"]["preferred_time_of_day"] == "MORNING" + + def test_entity_chunks_ordered_by_relevance(self, store, mock_embed, person_with_chunks): + """get_entity_chunks returns chunks ordered by relevance score.""" + entity_id, chunk_ids = person_with_chunks + + chunks = store.get_entity_chunks(entity_id, limit=10) + assert len(chunks) == 4 + + # All should have relevance 1.0 (set in fixture) + for chunk in chunks: + assert chunk["relevance"] == 1.0 + assert chunk["content"] is not None + + def test_get_entity_chunks_with_content(self, store, mock_embed, person_with_chunks): + """get_entity_chunks returns full chunk content.""" + entity_id, chunk_ids = person_with_chunks + + chunks = store.get_entity_chunks(entity_id, limit=10) + contents = [c["content"] for c in chunks] + assert any("impossible" in c for c in contents) + assert any("15:00" in c for c in contents) + + +# ── 5. Multiple Persons Isolation ───────────────────────────── + + +class TestMultiplePersonIsolation: + """Verify that per-person scoping correctly isolates different people.""" + + def test_two_persons_isolated_search(self, store, mock_embed): + """Searching by entity_id for person A doesn't return person B's chunks.""" + # Create person A + person_a_id = store.upsert_entity( + entity_id="person-a", + entity_type="person", + name="Person A", + metadata={"preferences": {"preferred_time_of_day": "MORNING"}}, + ) + + # Create person B + person_b_id = store.upsert_entity( + entity_id="person-b", + entity_type="person", + name="Person B", + metadata={"preferences": {"preferred_time_of_day": "EVENING"}}, + ) + + # Create chunks for each + now = "2026-02-26T10:00:00Z" + for person_id, content, chunk_id in [ + (person_a_id, "Person A likes mornings", "chunk-a"), + (person_b_id, "Person B prefers evenings", "chunk-b"), + ]: + embedding = mock_embed(content) + cursor = store.conn.cursor() + cursor.execute( + """INSERT INTO chunks (id, content, metadata, source_file, project, + content_type, value_type, char_count, source, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + (chunk_id, content, "{}", "test.jsonl", "test", "note", "HIGH", len(content), "manual", now), + ) + cursor.execute( + "INSERT INTO chunk_vectors (chunk_id, embedding) VALUES (?, ?)", + (chunk_id, serialize_f32(embedding)), + ) + store.link_entity_chunk(person_id, chunk_id, relevance=1.0) + + # Search for person A's memories + embedding = mock_embed("time preference") + results_a = store.search( + query_embedding=embedding, + n_results=10, + entity_id=person_a_id, + ) + + # Search for person B's memories + results_b = store.search( + query_embedding=embedding, + n_results=10, + entity_id=person_b_id, + ) + + # Person A should only see chunk-a + assert "chunk-a" in results_a["ids"][0] + assert "chunk-b" not in results_a["ids"][0] + + # Person B should only see chunk-b + assert "chunk-b" in results_b["ids"][0] + assert "chunk-a" not in results_b["ids"][0] diff --git a/tests/test_think_recall_integration.py b/tests/test_think_recall_integration.py index fcdfb87..d6b39f6 100644 --- a/tests/test_think_recall_integration.py +++ b/tests/test_think_recall_integration.py @@ -246,13 +246,13 @@ class TestMCPToolCount: """Verify MCP server has 5 tools (Phase 4 + Phase 3).""" def test_tool_count(self): - """MCP server should have 5 tools: search, store, recall, digest, entity.""" + """MCP server should have 6 tools: search, store, recall, digest, entity, get_person.""" import asyncio from brainlayer.mcp import list_tools tools = asyncio.run(list_tools()) - assert len(tools) == 5 + assert len(tools) == 6 def test_consolidated_tools_registered(self): """brain_search, brain_store, brain_recall are registered."""