Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 194 additions & 3 deletions src/brainlayer/mcp/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ async def _with_timeout(coro, timeout: float = MCP_QUERY_TIMEOUT):
server = Server(
"brainlayer",
instructions=(
"Memory layer for Claude Code. 5 tools:\n"
"Memory layer for Claude Code. 6 tools:\n"
"- brain_search(query): semantic search across 268K+ indexed conversation chunks. "
"Filters: project, file_path, chunk_id, content_type, tag, intent, importance_min. "
"Routing is automatic — pass file_path for file history, chunk_id to expand context, no args for current work.\n"
Expand Down Expand Up @@ -638,6 +638,10 @@ async def list_tools() -> list[Tool]:
"default": 10,
"description": "Maximum results for think/recall modes (default: 10)",
},
"entity_id": {
"type": "string",
"description": "Filter results to chunks linked to this entity ID. Used for per-person memory scoping (e.g., get only memories about a specific person). Bypasses routing rules.",
},
},
"required": ["query"],
},
Expand Down Expand Up @@ -701,11 +705,50 @@ async def list_tools() -> list[Tool]:
"items": {"type": "string"},
"description": "Files affected by this decision.",
},
"entity_id": {
"type": "string",
"description": "Link this memory to an entity (e.g., a person). The stored chunk will be linked via kg_entity_chunks for per-person memory retrieval.",
},
},
"required": ["content"],
},
outputSchema=_STORE_OUTPUT_SCHEMA,
),
Tool(
name="brain_get_person",
title="Get Person Context",
description="""Composite tool: look up a person entity and retrieve their scoped memories in one call.

Returns the person's profile (hard_constraints, preferences, contact_info),
their relations in the knowledge graph, and relevant memory chunks linked to them.

If 'context' is provided, memories are ranked by semantic relevance to the context.
Otherwise, memories are ordered by their entity-chunk relevance score.

Designed for copilot agents that need full person context in a single call.""",
annotations=_READ_ONLY,
inputSchema={
"type": "object",
"properties": {
"name": {
"type": "string",
"description": "Person name to look up (e.g., 'Avi Simon'). Searches by FTS + semantic match.",
},
"context": {
"type": "string",
"description": "Optional meeting/conversation context to rank memories by relevance (e.g., 'schedule a meeting next week about product roadmap').",
},
"num_memories": {
"type": "integer",
"default": 10,
"minimum": 1,
"maximum": 50,
"description": "Number of memory chunks to return (default: 10).",
},
},
"required": ["name"],
},
),
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Tool(
name="brain_recall",
title="Recall Context",
Expand Down Expand Up @@ -906,6 +949,7 @@ async def call_tool(name: str, arguments: dict[str, Any]):
date_from=arguments.get("date_from"),
date_to=arguments.get("date_to"),
sentiment=arguments.get("sentiment"),
entity_id=arguments.get("entity_id"),
num_results=arguments.get("num_results", 5),
before=max(0, min(arguments.get("before", 3), 50)),
after=max(0, min(arguments.get("after", 3), 50)),
Expand All @@ -927,6 +971,16 @@ async def call_tool(name: str, arguments: dict[str, Any]):
outcome=arguments.get("outcome"),
reversibility=arguments.get("reversibility"),
files_changed=arguments.get("files_changed"),
entity_id=arguments.get("entity_id"),
)

elif name == "brain_get_person":
return await _with_timeout(
_brain_get_person(
name=arguments["name"],
context=arguments.get("context"),
num_memories=arguments.get("num_memories", 10),
)
)

elif name == "brain_recall":
Expand Down Expand Up @@ -1141,6 +1195,105 @@ async def _brain_entity(
return CallToolResult(content=[TextContent(type="text", text=json.dumps(result, indent=2))])


async def _brain_get_person(
name: str,
context: str | None = None,
num_memories: int = 10,
) -> CallToolResult:
"""Composite tool: look up a person entity + retrieve their scoped memories.

Returns structured JSON with:
- profile: entity metadata (constraints, preferences, contact info)
- relations: entity relations from KG
- memories: relevant memory chunks linked to this person
"""
import json

from ..pipeline.digest import entity_lookup

store = _get_vector_store()
model = _get_embedding_model()
loop = asyncio.get_event_loop()

# Step 1: Look up the person entity
try:
entity = await loop.run_in_executor(
None,
lambda: entity_lookup(
query=name,
store=store,
embed_fn=model.embed_query,
entity_type="person",
),
)
except Exception as e:
return _error_result(f"Person lookup failed: {e}")

if entity is None:
return CallToolResult(content=[TextContent(type="text", text=f"No person entity found matching '{name}'.")])

entity_id = entity["id"]

# Step 2: Get per-person scoped memories
memories = []
try:
if context:
# If context provided, do semantic search scoped to this person's chunks
query_embedding = await loop.run_in_executor(None, model.embed_query, context)
results = await loop.run_in_executor(
None,
lambda: store.hybrid_search(
query_embedding=query_embedding,
query_text=context,
n_results=num_memories,
entity_id=entity_id,
),
)
if results["documents"][0]:
for doc, meta in zip(results["documents"][0], results["metadatas"][0]):
memories.append(
{
"content": doc[:500],
"type": meta.get("content_type", "unknown"),
"date": meta.get("created_at", "")[:10] if meta.get("created_at") else None,
"summary": meta.get("summary"),
}
)
else:
# No context: return entity's linked chunks ordered by relevance
entity_chunks = await loop.run_in_executor(
None,
lambda: store.get_entity_chunks(entity_id, limit=num_memories),
)
for chunk in entity_chunks:
memories.append(
{
"content": chunk["content"][:500] if chunk.get("content") else "",
"type": chunk.get("content_type", "unknown"),
"date": chunk.get("created_at", "")[:10] if chunk.get("created_at") else None,
"relevance": chunk.get("relevance"),
}
)
except Exception as e:
logger.warning("Memory retrieval for person '%s' failed: %s", name, e)

# Step 3: Build composite result
metadata = entity.get("metadata", {})
result = {
"entity_id": entity_id,
"name": entity["name"],
"profile": metadata,
"hard_constraints": metadata.get("hard_constraints", {}),
"preferences": metadata.get("preferences", {}),
"contact_info": metadata.get("contact_info", {}),
"relations": entity.get("relations", []),
"memories": memories,
"memory_count": len(memories),
}

return CallToolResult(content=[TextContent(type="text", text=json.dumps(result, indent=2))])


# --- Consolidated Dispatchers (Phase 4) ---


Expand All @@ -1157,24 +1310,49 @@ async def _brain_search(
date_from: str | None = None,
date_to: str | None = None,
sentiment: str | None = None,
entity_id: str | None = None,
num_results: int = 5,
before: int = 3,
after: int = 3,
max_results: int = 10,
):
"""Unified search dispatcher — routes to the right internal handler."""
"""Unified search dispatcher — routes to the right internal handler.

Args:
entity_id: If provided, only return chunks linked to this entity.
Bypasses most routing rules and goes straight to hybrid search
with entity scoping. Used for per-person memory retrieval.
"""

# Auto-scope project from CWD if not provided — but ONLY for claude_code source.
# Non-claude_code sources (youtube, whatsapp, etc.) have null/different project values,
# so auto-scoping filters them out entirely (bug: brain_search(source="youtube") → 0 results).
if project is None and source not in ("youtube", "whatsapp", "telegram", "all"):
# Also skip auto-scope when entity_id is set (entity-scoped search is cross-project).
if project is None and entity_id is None and source not in ("youtube", "whatsapp", "telegram", "all"):
try:
from ..scoping import resolve_project_scope

project = resolve_project_scope()
except Exception:
pass # Scoping failure should never block search

# Entity-scoped search: skip routing rules, go straight to hybrid search
if entity_id is not None:
return await _search(
query=query,
project=project,
content_type=content_type,
num_results=num_results,
source=source,
tag=tag,
intent=intent,
importance_min=importance_min,
date_from=date_from,
date_to=date_to,
sentiment=sentiment,
entity_id=entity_id,
)

# Rule 1: chunk context expand
if chunk_id is not None:
return await _context(chunk_id=chunk_id, before=before, after=after)
Expand Down Expand Up @@ -1345,6 +1523,7 @@ async def _store_new(
outcome: str | None = None,
reversibility: str | None = None,
files_changed: list[str] | None = None,
entity_id: str | None = None,
):
"""Wrapper for _store with auto-type detection and auto-importance."""
resolved_type = memory_type or _detect_memory_type(content)
Expand All @@ -1359,6 +1538,7 @@ async def _store_new(
outcome=outcome,
reversibility=reversibility,
files_changed=files_changed,
entity_id=entity_id,
)


Expand All @@ -1377,6 +1557,7 @@ async def _search(
date_from: str | None = None,
date_to: str | None = None,
sentiment: str | None = None,
entity_id: str | None = None,
):
"""Execute a hybrid search query (semantic + keyword via RRF)."""
try:
Expand Down Expand Up @@ -1410,6 +1591,11 @@ async def _search(
else:
source_filter = "claude_code"

# When searching by entity_id, skip source_filter default (entity memories
# may come from any source: manual, digest, claude_code, etc.)
if entity_id and not source:
source_filter = None

# Use hybrid search (semantic + FTS5 keyword via RRF)
results = store.hybrid_search(
query_embedding=query_embedding,
Expand All @@ -1424,6 +1610,7 @@ async def _search(
date_from=date_from,
date_to=date_to,
sentiment_filter=sentiment,
entity_id=entity_id,
)

if not results["documents"][0]:
Expand Down Expand Up @@ -2057,6 +2244,7 @@ def _flush_pending_stores(store, embed_fn) -> int:
outcome=item.get("outcome"),
reversibility=item.get("reversibility"),
files_changed=item.get("files_changed"),
entity_id=item.get("entity_id"),
)
flushed += 1
except Exception:
Expand All @@ -2081,6 +2269,7 @@ async def _store(
outcome: str | None = None,
reversibility: str | None = None,
files_changed: list[str] | None = None,
entity_id: str | None = None,
):
"""Store a memory into BrainLayer. Buffers to JSONL on DB lock."""
try:
Expand Down Expand Up @@ -2109,6 +2298,7 @@ def _embed(text: str) -> list[float]:
outcome=outcome,
reversibility=reversibility,
files_changed=files_changed,
entity_id=entity_id,
Comment thread
cursor[bot] marked this conversation as resolved.
),
)

Expand Down Expand Up @@ -2151,6 +2341,7 @@ def _embed(text: str) -> list[float]:
"outcome": outcome,
"reversibility": reversibility,
"files_changed": files_changed,
"entity_id": entity_id,
}
)
structured = {"chunk_id": "queued", "related": []}
Expand Down
16 changes: 16 additions & 0 deletions src/brainlayer/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def store_memory(
outcome: Optional[str] = None,
reversibility: Optional[str] = None,
files_changed: Optional[List[str]] = None,
entity_id: Optional[str] = None,
) -> Dict[str, Any]:
"""Persistently store a memory into BrainLayer.

Expand All @@ -57,6 +58,8 @@ def store_memory(
outcome: Optional decision outcome (pending/validated/reversed).
reversibility: Optional reversibility (easy/hard/destructive).
files_changed: Optional list of affected file paths.
entity_id: Optional entity ID to link this memory to via kg_entity_chunks.
Used for per-person memory tagging.

Returns:
Dict with 'id' (chunk ID) and 'related' (list of similar existing memories).
Expand Down Expand Up @@ -136,6 +139,19 @@ def store_memory(
# (The trigger handles this for INSERT INTO chunks, but since we bypass
# the normal upsert_chunks flow, verify it's there)

# Link to entity if entity_id provided (per-person memory tagging)
if entity_id:
# Validate entity exists to avoid dangling kg_entity_chunks rows
entity = store.get_entity(entity_id)
if entity is None:
raise ValueError(f"Unknown entity_id: {entity_id}")
store.link_entity_chunk(
entity_id=entity_id,
chunk_id=chunk_id,
relevance=1.0,
context=f"Stored via brain_store: {memory_type}",
)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Entity validation after chunk commit creates orphaned data

Medium Severity

The entity_id validation (store.get_entity(entity_id)) happens after the chunk and its embedding have already been written to the database. Since the project uses apsw, which auto-commits each statement, both the INSERT INTO chunks and INSERT INTO chunk_vectors are permanently committed before the entity check runs. If the entity doesn't exist, ValueError is raised, the caller returns a "Validation error" to the user, but the chunk is already persisted — orphaned and unlinked. The user, believing the store failed, may retry and create a duplicate.

Moving the entity existence check before the chunk insert would fix the atomicity issue.

Fix in Cursor Fix in Web


return {
"id": chunk_id,
"related": related,
Expand Down
Loading