From a14c082639e6623345ec768e92eddc110232e0e8 Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 16:20:32 +0300 Subject: [PATCH 01/15] fix: block recursive brainlayer ingest --- src/brainlayer/drain.py | 13 ++ src/brainlayer/engine.py | 5 + src/brainlayer/ingest_guard.py | 34 +++ src/brainlayer/mcp/__init__.py | 12 ++ src/brainlayer/mcp/search_handler.py | 100 +++++++-- src/brainlayer/mcp/store_handler.py | 2 + src/brainlayer/search_repo.py | 133 +++++++++++- src/brainlayer/store.py | 2 + src/brainlayer/vector_store.py | 4 + src/brainlayer/watcher_bridge.py | 7 + tests/test_audit_recursion_filter.py | 288 ++++++++++++++++++++++++++ tests/test_ingest_guard.py | 113 ++++++++++ tests/test_precompact_chunk_origin.py | 38 ++++ tests/test_search_exact_chunk_id.py | 10 +- 14 files changed, 733 insertions(+), 28 deletions(-) create mode 100644 src/brainlayer/ingest_guard.py create mode 100644 tests/test_audit_recursion_filter.py create mode 100644 tests/test_ingest_guard.py diff --git a/src/brainlayer/drain.py b/src/brainlayer/drain.py index 77ae6d27..4afd1f18 100644 --- a/src/brainlayer/drain.py +++ b/src/brainlayer/drain.py @@ -28,6 +28,7 @@ merge_existing_chunk_content, merge_existing_chunk_seen, ) +from .ingest_guard import recursive_mcp_output_reason from .paths import get_db_path logger = logging.getLogger(__name__) @@ -159,6 +160,10 @@ def _apply_store(conn: apsw.Connection, event: dict[str, Any]) -> ApplyResult: if not content: logger.warning("Skipping malformed store event with empty content") return ApplyResult() + recursive_reason = recursive_mcp_output_reason(content) + if recursive_reason: + logger.warning("Skipping recursive MCP store event: %s", recursive_reason) + return ApplyResult() now = datetime.now(timezone.utc).isoformat() metadata = {"memory_type": event.get("memory_type", "note")} raw_metadata = event.get("metadata") @@ -239,6 +244,10 @@ def _apply_watcher(conn: apsw.Connection, event: dict[str, Any]) -> None: if not content: logger.warning("Skipping malformed watcher event with empty content") return + recursive_reason = recursive_mcp_output_reason(content) + if recursive_reason: + logger.warning("Skipping recursive MCP watcher event: %s", recursive_reason) + return tags = event.get("tags") _insert_or_merge_chunk( conn, @@ -270,6 +279,10 @@ def _apply_hook(conn: apsw.Connection, event: dict[str, Any]) -> None: if not content: logger.warning("Skipping malformed hook event with empty content") return + recursive_reason = recursive_mcp_output_reason(content) + if recursive_reason: + logger.warning("Skipping recursive MCP hook event: %s", recursive_reason) + return content_hash = event.get("content_hash") or hashlib.sha256(content.encode()).hexdigest()[:16] session_id = event.get("session_id") or "unknown" chunk_id = event.get("chunk_id") or f"rt-{str(session_id)[:8]}-{content_hash}" diff --git a/src/brainlayer/engine.py b/src/brainlayer/engine.py index 80a53655..5511cd5c 100644 --- a/src/brainlayer/engine.py +++ b/src/brainlayer/engine.py @@ -178,6 +178,7 @@ def think( embed_fn: Any, project: str | None = None, max_results: int = 10, + include_audit: bool = False, ) -> ThinkResult: """Given current task context, retrieve relevant past knowledge. @@ -206,6 +207,7 @@ def think( n_results=max_results, project_filter=project, importance_min=3.0, # Skip low-importance noise + include_audit=include_audit, ) if not results["documents"][0]: @@ -239,6 +241,7 @@ def recall( topic: str | None = None, project: str | None = None, max_results: int = 10, + include_audit: bool = False, ) -> RecallResult: """Proactive smart retrieval based on file or topic. @@ -278,6 +281,7 @@ def recall( query_text=fname, n_results=max_results, project_filter=project, + include_audit=include_audit, ) for doc, meta in zip(search_results["documents"][0], search_results["metadatas"][0]): result.related_chunks.append( @@ -299,6 +303,7 @@ def recall( query_text=topic, n_results=max_results, project_filter=project, + include_audit=include_audit, ) for doc, meta in zip(search_results["documents"][0], search_results["metadatas"][0]): result.related_chunks.append( diff --git a/src/brainlayer/ingest_guard.py b/src/brainlayer/ingest_guard.py new file mode 100644 index 00000000..8270d3f5 --- /dev/null +++ b/src/brainlayer/ingest_guard.py @@ -0,0 +1,34 @@ +"""Write-side guards for content that must never enter BrainLayer.""" + +from __future__ import annotations + +import re + +_JSONRPC_MESSAGE_RE = re.compile(r'"jsonrpc"\s*:\s*"2\.0"', re.IGNORECASE) +_INVALID_JSONRPC_MARKER = "mcp brainlayer memory: invalid json-rpc message" +_BRAIN_SEARCH_BOX_PREFIX = "┌─ brain_search:" + + +def recursive_mcp_output_reason(content: str | None) -> str | None: + """Return a reason when content is BrainLayer MCP output being re-ingested.""" + if not content: + return None + + stripped = str(content).lstrip() + if stripped.startswith(_BRAIN_SEARCH_BOX_PREFIX): + return "brain_search_output" + + folded = stripped.casefold() + if _INVALID_JSONRPC_MARKER in folded: + return "invalid_jsonrpc_mcp_output" + if _JSONRPC_MESSAGE_RE.search(stripped): + return "jsonrpc_message" + + return None + + +def reject_recursive_mcp_output(content: str | None) -> None: + """Raise ValueError when content is recursive BrainLayer MCP output.""" + reason = recursive_mcp_output_reason(content) + if reason: + raise ValueError(f"recursive MCP output is not stored in BrainLayer: {reason}") diff --git a/src/brainlayer/mcp/__init__.py b/src/brainlayer/mcp/__init__.py index eec119d3..f45fe5a5 100644 --- a/src/brainlayer/mcp/__init__.py +++ b/src/brainlayer/mcp/__init__.py @@ -511,6 +511,11 @@ async def list_tools() -> list[Tool]: "default": False, "description": "Include PreCompact checkpoint chunks in search results. Defaults to false; use brain_resume for explicit session recovery.", }, + "include_audit": { + "type": "boolean", + "default": False, + "description": "Opt in to audit/eval and recursive MCP-output memories. Defaults false to prevent audit-recursion pollution.", + }, "detail": { "type": "string", "enum": ["compact", "full"], @@ -828,6 +833,11 @@ async def list_tools() -> list[Tool]: "default": False, "description": "Include PreCompact checkpoint chunks in mode=search results. Defaults to false; use brain_resume for explicit session recovery.", }, + "include_audit": { + "type": "boolean", + "default": False, + "description": "Opt in to audit/eval and recursive MCP-output memories in mode=search. Defaults false to prevent audit-recursion pollution.", + }, }, } ), @@ -1255,6 +1265,7 @@ async def call_tool(name: str, arguments: dict[str, Any]): source_filter=resolved_source_filter, correction_category=arguments.get("correction_category"), include_checkpoints=arguments.get("include_checkpoints", False), + include_audit=arguments.get("include_audit", False), ) ) @@ -1343,6 +1354,7 @@ async def call_tool(name: str, arguments: dict[str, Any]): detail=arguments.get("detail", "compact"), entity_type=arguments.get("entity_type"), include_checkpoints=arguments.get("include_checkpoints", False), + include_audit=arguments.get("include_audit", False), ) ) diff --git a/src/brainlayer/mcp/search_handler.py b/src/brainlayer/mcp/search_handler.py index a2840608..145db69d 100644 --- a/src/brainlayer/mcp/search_handler.py +++ b/src/brainlayer/mcp/search_handler.py @@ -10,8 +10,9 @@ from mcp.types import TextContent from .._helpers import _escape_fts5_query, _is_sqlite_busy_error -from ..chunk_origin import CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT +from ..chunk_origin import CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT, is_precompact_checkpoint_content from ..lexical_defense import _normalize_surface, load_lexical_defense_dictionary +from ..search_repo import _is_audit_recursion_metadata # Retry settings for DB lock resilience on reads _RETRY_MAX_ATTEMPTS = 3 @@ -39,6 +40,11 @@ _CHUNK_ID_QUERY_RE = re.compile(r"^[A-Za-z][A-Za-z0-9_]*(?:-[A-Za-z0-9_]+)+$") +def _empty_exact_chunk_lookup_result(query: str) -> tuple[list[TextContent], dict]: + structured: dict[str, Any] = {"query": query, "total": 0, "results": []} + return ([TextContent(type="text", text=format_search_results(query, [], 0))], structured) + + def _utcnow_iso() -> str: return datetime.now(timezone.utc).isoformat() @@ -170,6 +176,7 @@ def _exact_chunk_lookup_result( source_filter: str | None = None, correction_category: str | None = None, include_checkpoints: bool = False, + include_audit: bool = False, ) -> tuple[list[TextContent], dict] | None: """Return an exact chunk hit for chunk-id shaped queries, or None on miss.""" candidate = query.strip() @@ -180,19 +187,21 @@ def _exact_chunk_lookup_result( if not chunk: return None if any(chunk.get(field) is not None for field in ("superseded_by", "aggregated_into", "archived_at")): - return None - if not include_checkpoints and chunk.get("chunk_origin") == CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT: - empty = {"query": query, "total": 0, "results": []} - return ([TextContent(type="text", text="No results found.")], empty) + return _empty_exact_chunk_lookup_result(query) + if not include_checkpoints and ( + chunk.get("chunk_origin") == CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT + or is_precompact_checkpoint_content(chunk.get("content")) + ): + return _empty_exact_chunk_lookup_result(query) if any(value is not None for value in (source, intent, sentiment, source_filter, correction_category)): - return None + return _empty_exact_chunk_lookup_result(query) if project is not None: chunk_project = _normalize_project_name(chunk.get("project")) or chunk.get("project") normalized_project = _normalize_project_name(project) or project if chunk_project not in (normalized_project, None): - return None + return _empty_exact_chunk_lookup_result(query) if content_type is not None and chunk.get("content_type") != content_type: - return None + return _empty_exact_chunk_lookup_result(query) tags = chunk.get("tags") parsed_tags = None @@ -202,16 +211,18 @@ def _exact_chunk_lookup_result( except (json.JSONDecodeError, TypeError): parsed_tags = None if tag is not None and tag not in (parsed_tags or []): - return None + return _empty_exact_chunk_lookup_result(query) + if not include_audit and _is_audit_recursion_metadata({"tags": parsed_tags or []}, chunk.get("content")): + return _empty_exact_chunk_lookup_result(query) if importance_min is not None: chunk_importance = chunk.get("importance") if not isinstance(chunk_importance, (int, float)) or float(chunk_importance) < float(importance_min): - return None + return _empty_exact_chunk_lookup_result(query) chunk_date = chunk.get("created_at", "")[:10] if chunk.get("created_at") else None if date_from is not None and (chunk_date is None or chunk_date < date_from): - return None + return _empty_exact_chunk_lookup_result(query) if date_to is not None and (chunk_date is None or chunk_date > date_to): - return None + return _empty_exact_chunk_lookup_result(query) item = { "score": 1.0, @@ -424,6 +435,7 @@ async def _brain_search( source_filter: str | None = None, correction_category: str | None = None, include_checkpoints: bool = False, + include_audit: bool = False, ): """Unified search dispatcher -- routes to the right internal handler.""" @@ -460,19 +472,32 @@ async def _brain_search( source_filter_like=source_filter, correction_category=correction_category, include_checkpoints=include_checkpoints, + include_audit=include_audit, ) if chunk_id is not None: store = _get_vector_store() chunk = store.get_chunk(chunk_id) - if not include_checkpoints and chunk and chunk.get("chunk_origin") == CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT: + if ( + not include_checkpoints + and isinstance(chunk, dict) + and ( + chunk.get("chunk_origin") == CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT + or is_precompact_checkpoint_content(chunk.get("content")) + ) + ): empty = {"query": query, "total": 0, "results": []} return ([TextContent(type="text", text="No results found.")], empty) return await _context(chunk_id=chunk_id, before=before, after=after) if file_path is not None and _query_has_regression_signal(query): regression_result = await _regression(file_path=file_path, project=project) - recall_result = await _recall(file_path=file_path, project=project, max_results=max_results) + recall_result = await _recall( + file_path=file_path, + project=project, + max_results=max_results, + include_audit=include_audit, + ) merged_text = [] if isinstance(regression_result, list): merged_text.extend(regression_result) @@ -484,7 +509,12 @@ async def _brain_search( if file_path is not None: timeline = await _file_timeline(file_path=file_path, project=project, limit=50) - recall_result = await _recall(file_path=file_path, project=project, max_results=max_results) + recall_result = await _recall( + file_path=file_path, + project=project, + max_results=max_results, + include_audit=include_audit, + ) merged_text = [] if isinstance(timeline, list): merged_text.extend(timeline) @@ -514,11 +544,17 @@ async def _brain_search( source_filter=source_filter, correction_category=correction_category, include_checkpoints=include_checkpoints, + include_audit=include_audit, ) if _query_signals_current_context(query): ctx = await _current_context(hours=24) - think_result = await _think(context=query, project=project, max_results=max_results) + think_result = await _think( + context=query, + project=project, + max_results=max_results, + include_audit=include_audit, + ) merged_text = [] if isinstance(ctx, tuple): merged_text.extend(ctx[0]) @@ -531,10 +567,10 @@ async def _brain_search( return merged_text if _query_signals_think(query): - return await _think(context=query, project=project, max_results=max_results) + return await _think(context=query, project=project, max_results=max_results, include_audit=include_audit) if _query_signals_recall(query): - return await _recall(topic=query, project=project, max_results=max_results) + return await _recall(topic=query, project=project, max_results=max_results, include_audit=include_audit) store = _get_vector_store() exact_chunk_hit = _exact_chunk_lookup_result( @@ -553,6 +589,7 @@ async def _brain_search( source_filter=source_filter, correction_category=correction_category, include_checkpoints=include_checkpoints, + include_audit=include_audit, ) if exact_chunk_hit is not None: return exact_chunk_hit @@ -599,6 +636,7 @@ async def _brain_search( entity_name=entity_name, project_filter=normalized_project, include_checkpoints=include_checkpoints, + include_audit=include_audit, ) chunk_results = kg_results.get("chunks", {}) @@ -671,6 +709,7 @@ async def _brain_search( source_filter_like=source_filter, correction_category=correction_category, include_checkpoints=include_checkpoints, + include_audit=include_audit, ) @@ -868,6 +907,7 @@ async def _brain_recall( source_filter: str | None = None, correction_category: str | None = None, include_checkpoints: bool = False, + include_audit: bool = False, ): """Unified recall dispatcher -- routes to session/context/search/entity handlers. @@ -926,6 +966,7 @@ async def _brain_recall( source_filter=source_filter, correction_category=correction_category, include_checkpoints=include_checkpoints, + include_audit=include_audit, ) if resolved_mode == "entity": @@ -987,6 +1028,7 @@ async def _search( source_filter_like: str | None = None, correction_category: str | None = None, include_checkpoints: bool = False, + include_audit: bool = False, ): """Execute a hybrid search query (semantic + keyword via RRF). Retries on BusyError.""" try: @@ -1048,6 +1090,7 @@ async def _search( source_filter_like=source_filter_like, correction_category=correction_category, include_checkpoints=include_checkpoints, + include_audit=include_audit, ) break except Exception as e: @@ -1330,7 +1373,12 @@ async def _plan_links( return _error_result(f"Plan links error: {str(e)}") -async def _think(context: str, project: str | None = None, max_results: int = 10): +async def _think( + context: str, + project: str | None = None, + max_results: int = 10, + include_audit: bool = False, +): """Execute think -- retrieve relevant memories for current task.""" try: from ..engine import think @@ -1346,7 +1394,12 @@ def _embed(text: str) -> list[float]: result = await loop.run_in_executor( None, lambda: think( - context=context, store=store, embed_fn=_embed, project=normalized_project, max_results=max_results + context=context, + store=store, + embed_fn=_embed, + project=normalized_project, + max_results=max_results, + include_audit=include_audit, ), ) structured = { @@ -1363,7 +1416,11 @@ def _embed(text: str) -> list[float]: async def _recall( - file_path: str | None = None, topic: str | None = None, project: str | None = None, max_results: int = 10 + file_path: str | None = None, + topic: str | None = None, + project: str | None = None, + max_results: int = 10, + include_audit: bool = False, ): """Execute recall -- proactive context retrieval.""" try: @@ -1386,6 +1443,7 @@ def _embed(text: str) -> list[float]: topic=topic, project=normalized_project, max_results=max_results, + include_audit=include_audit, ), ) structured = { diff --git a/src/brainlayer/mcp/store_handler.py b/src/brainlayer/mcp/store_handler.py index 062852ef..6bff5c78 100644 --- a/src/brainlayer/mcp/store_handler.py +++ b/src/brainlayer/mcp/store_handler.py @@ -501,6 +501,7 @@ async def _store( """ try: if os.environ.get("BRAINLAYER_ARBITRATED") == "1": + from ..ingest_guard import reject_recursive_mcp_output from ..pipeline.classify import looks_like_system_prompt from ..search_repo import clear_hybrid_search_cache from ..store import VALID_MEMORY_TYPES @@ -510,6 +511,7 @@ async def _store( content = content.strip() if memory_type not in VALID_MEMORY_TYPES: raise ValueError(f"type must be one of: {', '.join(VALID_MEMORY_TYPES)}") + reject_recursive_mcp_output(content) if looks_like_system_prompt(content): raise ValueError("system prompt content is not stored in BrainLayer") _queue_store( diff --git a/src/brainlayer/search_repo.py b/src/brainlayer/search_repo.py index b1e055e0..98028fa3 100644 --- a/src/brainlayer/search_repo.py +++ b/src/brainlayer/search_repo.py @@ -17,6 +17,7 @@ from ._helpers import _escape_fts5_query, _is_sqlite_busy_error, serialize_f32 from .chunk_origin import CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT from .dedupe import resolve_chunk_id +from .ingest_guard import recursive_mcp_output_reason # ── hybrid_search result cache ─────────────────────────────────────────────── # Caches identical (store, query_text, filters) → results for 60s. @@ -40,6 +41,12 @@ "Grounding Results — Prompt", ] META_NOISE_PATTERNS_CASEFOLDED = [pattern.casefold() for pattern in META_NOISE_PATTERNS] +AUDIT_RECURSION_TAG_PATTERNS = ( + "{tag_expr} LIKE '%audit%'", + "{tag_expr} = 'r0x'", + "{tag_expr} = 'r02'", + "{tag_expr} GLOB 'r0[0-9]'", +) # Module-level LRU cache: {cache_key: (result, timestamp)} _hybrid_cache: "OrderedDict[tuple, tuple[dict, float]]" = OrderedDict() @@ -85,6 +92,7 @@ def _hybrid_cache_key( k: int, include_archived: bool = False, include_checkpoints: bool = False, + include_audit: bool = False, ) -> tuple: return ( store_key, @@ -106,6 +114,7 @@ def _hybrid_cache_key( k, include_archived, include_checkpoints, + include_audit, ) @@ -121,14 +130,107 @@ def _contains_meta_noise(content: Optional[str]) -> bool: return any(pattern in content_folded for pattern in META_NOISE_PATTERNS_CASEFOLDED) +def _audit_recursion_tag_predicate(tag_expr: str) -> str: + lowered = f"LOWER(CAST({tag_expr} AS TEXT))" + return "(" + " OR ".join(pattern.format(tag_expr=lowered) for pattern in AUDIT_RECURSION_TAG_PATTERNS) + ")" + + +def _audit_recursion_exclusion_sql(chunk_id_expr: str, tags_expr: str, *, use_chunk_tags: bool = True) -> str: + if use_chunk_tags: + tag_filter = ( + "NOT EXISTS (" + "SELECT 1 FROM chunk_tags audit_tags " + f"WHERE audit_tags.chunk_id = {chunk_id_expr} " + f"AND {_audit_recursion_tag_predicate('audit_tags.tag')}" + ")" + ) + else: + tags_json = f"CASE WHEN json_valid({tags_expr}) THEN {tags_expr} ELSE '[]' END" + tag_filter = ( + "NOT EXISTS (" + f"SELECT 1 FROM json_each({tags_json}) audit_tags " + f"WHERE {_audit_recursion_tag_predicate('audit_tags.value')}" + ")" + ) + + content_expr = ( + f"COALESCE(CAST({tags_expr.replace('.tags', '.content') if '.tags' in tags_expr else 'content'} AS TEXT), '')" + ) + recursive_content_filter = ( + f"LTRIM({content_expr}) NOT LIKE '┌─ brain_search:%' " + f"AND LOWER({content_expr}) NOT LIKE '%mcp brainlayer memory: invalid json-rpc message%' " + f"AND REPLACE(LOWER({content_expr}), ' ', '') NOT LIKE '%\"jsonrpc\":\"2.0\"%'" + ) + return f"({tag_filter} AND {recursive_content_filter})" + + +def _is_audit_recursion_metadata(meta: dict, content: str | None = None) -> bool: + if recursive_mcp_output_reason(content): + return True + tags = meta.get("tags") + if not isinstance(tags, list): + return False + for tag in tags: + normalized = str(tag).casefold() + if "audit" in normalized: + return True + if normalized in {"r02", "r0x"}: + return True + if len(normalized) == 3 and normalized[:2] == "r0" and normalized[2].isdigit(): + return True + return False + + +def _precompact_content_exclusion_sql(content_expr: str) -> str: + normalized = f"LOWER(LTRIM(COALESCE(CAST({content_expr} AS TEXT), '')))" + prefix = f"SUBSTR({normalized}, 1, 1024)" + return ( + f"{normalized} NOT LIKE '[precompact checkpoint]%' " + f"AND {normalized} NOT LIKE '# precompact checkpoint%' " + f"AND {normalized} NOT LIKE 'session-restore%' " + f"AND {normalized} NOT LIKE '# session-restore%' " + f"AND {prefix} NOT LIKE '%content=\"[precompact checkpoint]%' " + f"AND {prefix} NOT LIKE '%content=''[precompact checkpoint]%' " + f'AND {prefix} NOT LIKE \'%"content": "[precompact checkpoint]%\' ' + f"AND {prefix} NOT LIKE '%''content'': ''[precompact checkpoint]%'" + ) + + class SearchMixin: """Search and query methods, mixed into VectorStore.""" + def _audit_recursion_exclusion_sql(self, chunk_id_expr: str, tags_expr: str) -> str: + return _audit_recursion_exclusion_sql( + chunk_id_expr, + tags_expr, + use_chunk_tags=getattr(self, "_chunk_tags_available", True), + ) + + def _audit_recursion_count(self) -> int: + try: + row = ( + self._read_cursor() + .execute( + f""" + SELECT COUNT(*) FROM chunks + WHERE NOT ({self._audit_recursion_exclusion_sql("id", "tags")}) + """ + ) + .fetchone() + ) + return int(row[0]) if row else 0 + except (apsw.Error, TypeError, IndexError, ValueError): + return 0 + def _checkpoint_exclusion_clause(self, table_alias: str | None = None) -> str | None: if not getattr(self, "_has_chunk_origin", True): return None column = f"{table_alias}.chunk_origin" if table_alias else "chunk_origin" - return f"COALESCE({column}, 'unknown') != 'precompact_checkpoint'" + content_column = f"{table_alias}.content" if table_alias else "content" + return ( + f"(COALESCE({column}, 'unknown') != 'precompact_checkpoint' " + f"AND {_precompact_content_exclusion_sql(content_column)})" + ) def _checkpoint_cache_data_version(self) -> int | None: for attempt in range(3): @@ -157,7 +259,11 @@ def _checkpoint_filtered_knn_k(self, n_results: int, include_checkpoints: bool) checkpoint_count = int( self._read_cursor() .execute( - "SELECT COUNT(*) FROM chunks WHERE chunk_origin = ?", + f""" + SELECT COUNT(*) FROM chunks + WHERE chunk_origin = ? + OR NOT ({_precompact_content_exclusion_sql("content")}) + """, (CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT,), ) .fetchone()[0] @@ -357,6 +463,7 @@ def search( include_checkpoints: bool = False, source_filter_like: Optional[str] = None, correction_category: Optional[str] = None, + include_audit: bool = False, ) -> Dict[str, List]: """Search chunks by embedding or text. @@ -418,6 +525,8 @@ def search( if correction_category: where_clauses.append("c.id IN (SELECT chunk_id FROM chunk_tags WHERE tag LIKE ?)") filter_params.append(f"correction:{correction_category}%") + if not include_audit: + where_clauses.append(self._audit_recursion_exclusion_sql("c.id", "c.tags")) if not include_checkpoints: checkpoint_clause = self._checkpoint_exclusion_clause("c") if checkpoint_clause: @@ -440,6 +549,7 @@ def search( or (source_filter and source_filter != "claude_code") or source_filter_like or correction_category + or (not include_audit and self._audit_recursion_count() > 0) ) effective_k = self._effective_knn_k(n_results, bool(needs_overfetch), include_checkpoints) params = [query_bytes, effective_k] + filter_params @@ -502,6 +612,8 @@ def search( if correction_category: where_clauses.append("id IN (SELECT chunk_id FROM chunk_tags WHERE tag LIKE ?)") params.append(f"correction:{correction_category}%") + if not include_audit: + where_clauses.append(self._audit_recursion_exclusion_sql("id", "tags")) if not include_checkpoints: checkpoint_clause = self._checkpoint_exclusion_clause() if checkpoint_clause: @@ -714,6 +826,7 @@ def _binary_search( include_checkpoints: bool = False, source_filter_like: Optional[str] = None, correction_category: Optional[str] = None, + include_audit: bool = False, ) -> Dict[str, List]: """Run KNN search against binary-quantized vectors.""" cursor = self._read_cursor() @@ -764,6 +877,8 @@ def _binary_search( if correction_category: where_clauses.append("c.id IN (SELECT chunk_id FROM chunk_tags WHERE tag LIKE ?)") filter_params.append(f"correction:{correction_category}%") + if not include_audit: + where_clauses.append(self._audit_recursion_exclusion_sql("c.id", "c.tags")) if not include_checkpoints: checkpoint_clause = self._checkpoint_exclusion_clause("c") if checkpoint_clause: @@ -778,7 +893,11 @@ def _binary_search( where_sql = "AND " + " AND ".join(where_clauses) needs_overfetch = ( - entity_id or (source_filter and source_filter != "claude_code") or source_filter_like or correction_category + entity_id + or (source_filter and source_filter != "claude_code") + or source_filter_like + or correction_category + or (not include_audit and self._audit_recursion_count() > 0) ) effective_k = self._effective_knn_k(n_results, bool(needs_overfetch), include_checkpoints) params = [query_bytes, effective_k] + filter_params @@ -924,6 +1043,7 @@ def hybrid_search( source_filter_like: Optional[str] = None, correction_category: Optional[str] = None, filter_meta_noise: bool = True, + include_audit: bool = False, ) -> Dict[str, List]: """Hybrid search combining semantic (vector) + keyword (FTS5) via Reciprocal Rank Fusion. @@ -958,6 +1078,7 @@ def hybrid_search( k, include_archived, include_checkpoints, + include_audit, ) + (fts_query_override, kg_boost, source_filter_like, correction_category, filter_meta_noise) now = time.monotonic() if cache_key in _hybrid_cache: @@ -993,6 +1114,7 @@ def hybrid_search( include_checkpoints=include_checkpoints, source_filter_like=source_filter_like, correction_category=correction_category, + include_audit=include_audit, ) semantic = self._rerank_binary_results_with_float(query_embedding, semantic) else: @@ -1015,6 +1137,7 @@ def hybrid_search( include_checkpoints=include_checkpoints, source_filter_like=source_filter_like, correction_category=correction_category, + include_audit=include_audit, ) # Build semantic rank map: chunk_content -> rank @@ -1075,6 +1198,8 @@ def hybrid_search( if correction_category: fts_extra.append("AND c.id IN (SELECT chunk_id FROM chunk_tags WHERE tag LIKE ?)") fts_filter_params.append(f"correction:{correction_category}%") + if not include_audit: + fts_extra.append(f"AND {self._audit_recursion_exclusion_sql('c.id', 'c.tags')}") if not include_checkpoints: checkpoint_clause = self._checkpoint_exclusion_clause("c") if checkpoint_clause: @@ -1222,6 +1347,8 @@ def _ingest_keyword_rows(rows: list[tuple], ranks: dict[str, int]) -> None: if filter_meta_noise and _contains_meta_noise(doc): continue + if not include_audit and _is_audit_recursion_metadata(meta, doc): + continue # Apply filters to FTS-only results if fts_rank is not None and sem_entry is None: diff --git a/src/brainlayer/store.py b/src/brainlayer/store.py index 1e1b815a..b25ae2db 100644 --- a/src/brainlayer/store.py +++ b/src/brainlayer/store.py @@ -41,6 +41,7 @@ from .chunk_origin import CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT, detect_chunk_origin from .dedupe import find_duplicate, merge_duplicate_chunk +from .ingest_guard import reject_recursive_mcp_output from .pipeline.classify import looks_like_system_prompt from .vector_store import VectorStore @@ -106,6 +107,7 @@ def store_memory( raise ValueError(f"type must be one of: {', '.join(VALID_MEMORY_TYPES)}") content = content.strip() + reject_recursive_mcp_output(content) if looks_like_system_prompt(content): raise ValueError("system prompt content is not stored in BrainLayer") diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index ad787c8e..e638da0b 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -48,6 +48,7 @@ merge_existing_chunk_seen, resolve_chunk_id, ) +from .ingest_guard import reject_recursive_mcp_output from .kg_repo import KGMixin from .search_repo import SearchMixin from .session_repo import SessionMixin @@ -113,6 +114,7 @@ def _init_readonly_db(self) -> None: self._has_chunk_origin = "chunk_origin" in chunk_columns self._binary_index_available = "chunk_vectors_binary" in existing_tables self._trigram_fts_available = "chunks_fts_trigram" in existing_tables + self._chunk_tags_available = "chunk_tags" in existing_tables self._local = threading.local() def _invalidate_checkpoint_count_cache(self) -> None: @@ -522,6 +524,7 @@ def _init_db(self) -> None: SELECT c.id, j.value FROM chunks c, json_each(c.tags) j WHERE c.tags IS NOT NULL AND json_valid(c.tags) = 1 """) + self._chunk_tags_available = True # Session context table cursor.execute(""" @@ -1358,6 +1361,7 @@ def upsert_chunks(self, chunks: List[Dict[str, Any]], embeddings: List[List[floa transaction_started = True for chunk, embedding in zip(chunks, embeddings): chunk_id = chunk["id"] + reject_recursive_mcp_output(chunk.get("content")) created_at = chunk.get("created_at") tags_value = chunk.get("tags") tags_json = json.dumps(tags_value) if isinstance(tags_value, (list, dict)) else tags_value diff --git a/src/brainlayer/watcher_bridge.py b/src/brainlayer/watcher_bridge.py index 8f9468b6..7f80ba63 100644 --- a/src/brainlayer/watcher_bridge.py +++ b/src/brainlayer/watcher_bridge.py @@ -23,6 +23,7 @@ from .chunk_origin import detect_chunk_origin from .dedupe import find_duplicate, merge_duplicate_chunk, merge_existing_chunk_seen, normalized_exact_hash +from .ingest_guard import recursive_mcp_output_reason from .paths import get_db_path from .pipeline.chunk import chunk_content from .pipeline.classify import classify_content @@ -127,6 +128,9 @@ def should_skip_entry(entry: dict) -> str | None: if len(raw_text.strip()) < MIN_RAW_CONTENT_LENGTH: return "too_short" + if recursive_mcp_output_reason(raw_text): + return "recursive_mcp_output" + # Skip if content is mostly system-reminder injection cleaned = _strip_system_reminders(raw_text) if len(cleaned.strip()) < MIN_RAW_CONTENT_LENGTH: @@ -142,6 +146,9 @@ def should_skip_chunk_content(content: str) -> str | None: if len(cleaned.strip()) < MIN_RAW_CONTENT_LENGTH: return "system_reminder_residue" + if recursive_mcp_output_reason(cleaned): + return "recursive_mcp_output" + # Skip pure file deletion diffs if _is_pure_deletion_diff(cleaned): return "pure_deletion_diff" diff --git a/tests/test_audit_recursion_filter.py b/tests/test_audit_recursion_filter.py new file mode 100644 index 00000000..7aa825e9 --- /dev/null +++ b/tests/test_audit_recursion_filter.py @@ -0,0 +1,288 @@ +import json + +from brainlayer._helpers import serialize_f32 +from brainlayer.engine import recall, think +from brainlayer.vector_store import VectorStore + + +def _insert_chunk(store: VectorStore, chunk_id: str, content: str, tags: list[str], embedding: list[float]) -> None: + cursor = store.conn.cursor() + cursor.execute( + """INSERT INTO chunks (id, content, metadata, source_file, project, + content_type, char_count, source, tags) + VALUES (?, ?, '{}', 'audit-filter-test.jsonl', 'brainlayer', + 'assistant_text', ?, 'claude_code', ?)""", + (chunk_id, content, len(content), json.dumps(tags)), + ) + cursor.execute( + "INSERT INTO chunk_vectors (chunk_id, embedding) VALUES (?, ?)", + (chunk_id, serialize_f32(embedding)), + ) + cursor.executemany( + "INSERT OR IGNORE INTO chunk_tags (chunk_id, tag) VALUES (?, ?)", + [(chunk_id, tag) for tag in tags], + ) + + +def test_hybrid_search_excludes_audit_recursion_by_default(tmp_path): + store = VectorStore(tmp_path / "audit-filter.db") + try: + query_embedding = [0.01] * 1024 + _insert_chunk( + store, + "audit-recursion-source", + "why restart BrainBar audit recursion contamination exact match", + ["r02", "audit"], + query_embedding, + ) + _insert_chunk( + store, + "ordinary-brainbar-memory", + "why restart BrainBar because launchd replaced the old degraded binary", + ["brainbar", "reliability"], + [0.02] * 1024, + ) + store.build_binary_index() + + default_results = store.hybrid_search( + query_embedding=query_embedding, + query_text="why restart BrainBar", + n_results=3, + ) + default_ids = default_results["ids"][0] + + assert "audit-recursion-source" not in default_ids + assert "ordinary-brainbar-memory" in default_ids + + audit_results = store.hybrid_search( + query_embedding=query_embedding, + query_text="why restart BrainBar", + n_results=3, + include_audit=True, + ) + + assert "audit-recursion-source" in audit_results["ids"][0] + finally: + store.close() + + +def test_hybrid_search_does_not_exclude_r0x_substrings_inside_normal_tags(tmp_path): + store = VectorStore(tmp_path / "audit-filter-substring.db") + try: + query_embedding = [0.03] * 1024 + _insert_chunk( + store, + "ordinary-mirror07-memory", + "mirror07 normal operational memory should remain searchable", + ["mirror07", "reliability"], + query_embedding, + ) + store.build_binary_index() + + results = store.hybrid_search( + query_embedding=query_embedding, + query_text="mirror07 normal operational memory", + n_results=3, + ) + + assert "ordinary-mirror07-memory" in results["ids"][0] + finally: + store.close() + + +def test_readonly_legacy_db_without_chunk_tags_uses_json_tag_fallback(tmp_path): + db_path = tmp_path / "legacy-readonly-audit-filter.db" + store = VectorStore(db_path) + try: + query_embedding = [0.04] * 1024 + _insert_chunk( + store, + "legacy-audit-source", + "legacy readonly audit memory should be filtered", + ["r02", "audit"], + query_embedding, + ) + _insert_chunk( + store, + "legacy-ordinary-memory", + "legacy readonly ordinary memory should be searchable", + ["brainbar", "reliability"], + [0.05] * 1024, + ) + store.build_binary_index() + cursor = store.conn.cursor() + for trigger in ( + "chunk_tags_insert", + "chunk_tags_update", + "chunk_tags_update_clear", + "chunk_tags_delete", + ): + cursor.execute(f"DROP TRIGGER IF EXISTS {trigger}") + cursor.execute("DROP TABLE chunk_tags") + finally: + store.close() + + db_path.chmod(0o444) + readonly_store = VectorStore(db_path) + try: + assert readonly_store._chunk_tags_available is False + results = readonly_store.hybrid_search( + query_embedding=query_embedding, + query_text="legacy readonly memory", + n_results=3, + ) + ids = results["ids"][0] + assert "legacy-audit-source" not in ids + assert "legacy-ordinary-memory" in ids + finally: + readonly_store.close() + db_path.chmod(0o644) + + +def test_hybrid_search_overfetches_when_audit_chunks_dominate_knn(tmp_path): + store = VectorStore(tmp_path / "audit-filter-overfetch.db") + try: + query_embedding = [0.06] * 1024 + for index in range(60): + _insert_chunk( + store, + f"audit-neighbor-{index}", + f"audit recursion neighbor {index}", + ["r02", "audit"], + query_embedding, + ) + _insert_chunk( + store, + "ordinary-after-audit-neighbors", + "ordinary BrainBar restart decision should survive audit-heavy nearest neighbors", + ["brainbar", "reliability"], + [0.061] * 1024, + ) + store.build_binary_index() + + results = store.hybrid_search( + query_embedding=query_embedding, + query_text="ordinary BrainBar restart decision", + n_results=3, + ) + + assert "ordinary-after-audit-neighbors" in results["ids"][0] + assert all(not chunk_id.startswith("audit-neighbor-") for chunk_id in results["ids"][0]) + finally: + store.close() + + +def test_exact_r0x_tag_is_filtered_as_audit_shorthand(tmp_path): + store = VectorStore(tmp_path / "audit-filter-r0x.db") + try: + query_embedding = [0.07] * 1024 + _insert_chunk( + store, + "audit-r0x-source", + "r0x audit shorthand memory should be filtered", + ["r0x"], + query_embedding, + ) + _insert_chunk( + store, + "ordinary-r0x-control", + "ordinary control memory should remain searchable", + ["brainbar"], + [0.071] * 1024, + ) + store.build_binary_index() + + results = store.hybrid_search( + query_embedding=query_embedding, + query_text="ordinary control memory", + n_results=3, + ) + + assert "audit-r0x-source" not in results["ids"][0] + assert "ordinary-r0x-control" in results["ids"][0] + finally: + store.close() + + +def test_recursive_mcp_output_content_is_filtered_even_without_audit_tags(tmp_path): + store = VectorStore(tmp_path / "recursive-mcp-output-filter.db") + try: + query_embedding = [0.08] * 1024 + _insert_chunk( + store, + "recursive-jsonrpc-output", + 'MCP BrainLayer Memory: Invalid JSON-RPC message: {"jsonrpc":"2.0","id":24}', + ["correction:factual", "auto-detected"], + query_embedding, + ) + _insert_chunk( + store, + "recursive-brain-search-box", + '┌─ brain_search: "BrainLayer audit recursion" ─ 1 result\n│ recursive output', + ["auto-detected"], + query_embedding, + ) + _insert_chunk( + store, + "ordinary-mcp-memory", + "BrainLayer MCP timeout investigation produced a real operational fix", + ["brainlayer", "mcp"], + [0.081] * 1024, + ) + store.build_binary_index() + + results = store.hybrid_search( + query_embedding=query_embedding, + query_text="BrainLayer MCP", + n_results=5, + ) + ids = results["ids"][0] + + assert "ordinary-mcp-memory" in ids + assert "recursive-jsonrpc-output" not in ids + assert "recursive-brain-search-box" not in ids + + audit_results = store.hybrid_search( + query_embedding=query_embedding, + query_text="BrainLayer MCP", + n_results=5, + include_audit=True, + ) + + assert "recursive-jsonrpc-output" in audit_results["ids"][0] + assert "recursive-brain-search-box" in audit_results["ids"][0] + finally: + store.close() + + +def test_engine_think_and_recall_forward_include_audit(): + class MockStore: + def __init__(self): + self.calls = [] + + def hybrid_search(self, **kwargs): + self.calls.append(kwargs) + return { + "documents": [["ordinary memory"]], + "metadatas": [[{"intent": "decision", "project": "brainlayer"}]], + } + + def get_file_timeline(self, *_args, **_kwargs): + return [] + + mock_store = MockStore() + think( + "think about audit history", + store=mock_store, + embed_fn=lambda _text: [0.1] * 1024, + include_audit=True, + ) + recall( + store=mock_store, + embed_fn=lambda _text: [0.1] * 1024, + topic="audit history", + include_audit=True, + ) + + assert mock_store.calls[0]["include_audit"] is True + assert mock_store.calls[1]["include_audit"] is True diff --git a/tests/test_ingest_guard.py b/tests/test_ingest_guard.py new file mode 100644 index 00000000..5d91fb29 --- /dev/null +++ b/tests/test_ingest_guard.py @@ -0,0 +1,113 @@ +import sqlite3 + +import pytest + +from brainlayer.drain import drain_once +from brainlayer.queue_io import enqueue_hook_chunk, enqueue_store, enqueue_watcher_chunk +from brainlayer.store import store_memory +from brainlayer.vector_store import VectorStore +from brainlayer.watcher_bridge import should_skip_chunk_content, should_skip_entry + +JSONRPC_RECURSION_CONTENT = ( + 'MCP BrainLayer Memory: Invalid JSON-RPC message: {"jsonrpc":"2.0","id":24,' + '"result":{"content":[{"type":"text","text":"recursive output"}]}}' +) + + +def _make_entry(text: str) -> dict: + return { + "type": "assistant", + "message": {"content": [{"type": "text", "text": text}]}, + "timestamp": "2026-05-16T12:00:00Z", + } + + +def test_watcher_preclassify_rejects_brain_search_output_box(): + entry = _make_entry('┌─ brain_search: "audit recursion" ─ 1 result\n│\n└─') + + assert should_skip_entry(entry) == "recursive_mcp_output" + + +def test_watcher_postchunk_rejects_jsonrpc_mcp_memory_output(): + assert should_skip_chunk_content(JSONRPC_RECURSION_CONTENT) == "recursive_mcp_output" + + +def test_direct_store_rejects_recursive_mcp_output(tmp_path): + store = VectorStore(tmp_path / "store-guard.db") + + with pytest.raises(ValueError, match="recursive MCP output"): + store_memory( + store=store, + embed_fn=None, + content=JSONRPC_RECURSION_CONTENT, + memory_type="note", + project="brainlayer", + tags=["correction:factual", "auto-detected"], + ) + + store.close() + + +def test_vector_upsert_rejects_recursive_mcp_output(tmp_path): + store = VectorStore(tmp_path / "upsert-guard.db") + + with pytest.raises(ValueError, match="recursive MCP output"): + store.upsert_chunks( + [ + { + "id": "rt-guarded-jsonrpc", + "content": JSONRPC_RECURSION_CONTENT, + "metadata": {}, + "source_file": "session.jsonl", + "project": "brainlayer", + "content_type": "assistant_text", + "char_count": len(JSONRPC_RECURSION_CONTENT), + } + ], + [[0.1] * 1024], + ) + + store.close() + + +def test_drain_drops_recursive_mcp_output_events(tmp_path, monkeypatch): + db_path = tmp_path / "drain-guard.db" + queue_dir = tmp_path / "queue" + VectorStore(db_path).close() + monkeypatch.setenv("BRAINLAYER_DRAIN_EMBED", "0") + + enqueue_store( + chunk_id="queued-store-recursion", + content=JSONRPC_RECURSION_CONTENT, + project="brainlayer", + tags=["correction:factual", "auto-detected"], + queue_dir=queue_dir, + ) + enqueue_watcher_chunk( + chunk_id="queued-watcher-recursion", + content='{"jsonrpc":"2.0","id":24,"result":"recursive watcher output"}', + metadata={}, + source_file="session.jsonl", + project="brainlayer", + content_type="assistant_text", + value_type="HIGH", + created_at="2026-05-16T12:00:00Z", + conversation_id="session", + tags=["auto-detected"], + queue_dir=queue_dir, + ) + enqueue_hook_chunk( + session_id="33abe108-session", + content="MCP BrainLayer Memory: Invalid JSON-RPC message: recursive hook output", + project="brainlayer", + queue_dir=queue_dir, + ) + + drained = drain_once(db_path=db_path, queue_dir=queue_dir, batch_size=10, log_path=tmp_path / "drain.log") + + with sqlite3.connect(db_path) as conn: + rows = conn.execute("SELECT id, content FROM chunks").fetchall() + + assert drained == 3 + assert rows == [] + assert not list(queue_dir.glob("*.jsonl")) diff --git a/tests/test_precompact_chunk_origin.py b/tests/test_precompact_chunk_origin.py index e1e41795..3857f631 100644 --- a/tests/test_precompact_chunk_origin.py +++ b/tests/test_precompact_chunk_origin.py @@ -338,6 +338,44 @@ def test_search_excludes_checkpoints_by_default_and_can_include_them(tmp_path): assert {"checkpoint", "normal"}.issubset(set(hybrid_with_checkpoints["ids"][0])) +def test_search_excludes_checkpoint_content_even_when_origin_backfill_missed_it(tmp_path): + _hybrid_cache.clear() + store = VectorStore(tmp_path / "search-missed-origin.db") + _insert_chunk( + store, + chunk_id="missed-origin-checkpoint", + content="[PreCompact checkpoint]\nCurrent task: should not leak through default search", + chunk_origin=CHUNK_ORIGIN_UNKNOWN, + ) + _insert_chunk( + store, + chunk_id="normal-missed-origin-control", + content="Current task memory that should remain visible in default search", + chunk_origin=CHUNK_ORIGIN_UNKNOWN, + ) + store.build_binary_index() + + text_default = store.search(query_text="Current task", n_results=10) + hybrid_default = store.hybrid_search( + query_embedding=_embed("Current task"), + query_text="Current task", + n_results=10, + ) + text_with_checkpoints = store.search(query_text="Current task", n_results=10, include_checkpoints=True) + hybrid_with_checkpoints = store.hybrid_search( + query_embedding=_embed("Current task"), + query_text="Current task", + n_results=10, + include_checkpoints=True, + ) + store.close() + + assert text_default["ids"][0] == ["normal-missed-origin-control"] + assert "missed-origin-checkpoint" not in hybrid_default["ids"][0] + assert {"missed-origin-checkpoint", "normal-missed-origin-control"}.issubset(set(text_with_checkpoints["ids"][0])) + assert {"missed-origin-checkpoint", "normal-missed-origin-control"}.issubset(set(hybrid_with_checkpoints["ids"][0])) + + def test_vector_search_overfetches_when_checkpoint_filter_discards_nearest_neighbors(tmp_path): store = VectorStore(tmp_path / "vector-overfetch.db") query_embedding = [0.0] * 1024 diff --git a/tests/test_search_exact_chunk_id.py b/tests/test_search_exact_chunk_id.py index e81de0bc..44995b0b 100644 --- a/tests/test_search_exact_chunk_id.py +++ b/tests/test_search_exact_chunk_id.py @@ -92,7 +92,7 @@ async def test_brain_search_exact_checkpoint_chunk_id_returns_empty_without_fall ): content, structured = await _brain_search(query=chunk_id, detail="compact") - assert content[0].text == "No results found." + assert "No results found." in content[0].text assert structured == {"query": chunk_id, "total": 0, "results": []} @@ -106,7 +106,8 @@ def test_exact_chunk_lookup_skips_lifecycle_managed_chunks(): "archived_at": "2026-04-30T09:15:00Z", } - assert _exact_chunk_lookup_result("brainbar-archived01", mock_store, "compact") is None + _, structured = _exact_chunk_lookup_result("brainbar-archived01", mock_store, "compact") + assert structured["total"] == 0 @pytest.mark.asyncio @@ -162,8 +163,9 @@ async def test_brain_search_exact_chunk_id_respects_project_scope(): ): result = await _brain_search(query=chunk_id, project="brainlayer", detail="compact") - assert result == (["fallback"], {"total": 0, "results": []}) - search_mock.assert_awaited_once() + _, structured = result + assert structured["total"] == 0 + search_mock.assert_not_awaited() @pytest.mark.asyncio From 16ef4328dc6e2a4c48d6480a8d50dfd25c0321c5 Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 16:35:14 +0300 Subject: [PATCH 02/15] fix: address recursive audit review gaps --- src/brainlayer/mcp/search_handler.py | 32 +++++++++---- src/brainlayer/search_repo.py | 54 ++++++++++++++++------ tests/test_audit_recursion_filter.py | 67 ++++++++++++++++++++++++++++ tests/test_ingest_guard.py | 58 +++++++++++------------- tests/test_search_exact_chunk_id.py | 66 +++++++++++++++++++++++++++ 5 files changed, 223 insertions(+), 54 deletions(-) diff --git a/src/brainlayer/mcp/search_handler.py b/src/brainlayer/mcp/search_handler.py index 145db69d..4e0d3c60 100644 --- a/src/brainlayer/mcp/search_handler.py +++ b/src/brainlayer/mcp/search_handler.py @@ -45,6 +45,21 @@ def _empty_exact_chunk_lookup_result(query: str) -> tuple[list[TextContent], dic return ([TextContent(type="text", text=format_search_results(query, [], 0))], structured) +def _parsed_chunk_tags(chunk: dict[str, Any]) -> list[Any]: + tags = chunk.get("tags") + if not tags: + return [] + if isinstance(tags, list): + return tags + if isinstance(tags, str): + try: + parsed = json.loads(tags) + except (json.JSONDecodeError, TypeError): + return [] + return parsed if isinstance(parsed, list) else [] + return [] + + def _utcnow_iso() -> str: return datetime.now(timezone.utc).isoformat() @@ -194,7 +209,7 @@ def _exact_chunk_lookup_result( ): return _empty_exact_chunk_lookup_result(query) if any(value is not None for value in (source, intent, sentiment, source_filter, correction_category)): - return _empty_exact_chunk_lookup_result(query) + return None if project is not None: chunk_project = _normalize_project_name(chunk.get("project")) or chunk.get("project") normalized_project = _normalize_project_name(project) or project @@ -203,13 +218,7 @@ def _exact_chunk_lookup_result( if content_type is not None and chunk.get("content_type") != content_type: return _empty_exact_chunk_lookup_result(query) - tags = chunk.get("tags") - parsed_tags = None - if tags: - try: - parsed_tags = json.loads(tags) if isinstance(tags, str) else tags - except (json.JSONDecodeError, TypeError): - parsed_tags = None + parsed_tags = _parsed_chunk_tags(chunk) if tag is not None and tag not in (parsed_tags or []): return _empty_exact_chunk_lookup_result(query) if not include_audit and _is_audit_recursion_metadata({"tags": parsed_tags or []}, chunk.get("content")): @@ -488,6 +497,13 @@ async def _brain_search( ): empty = {"query": query, "total": 0, "results": []} return ([TextContent(type="text", text="No results found.")], empty) + if ( + not include_audit + and isinstance(chunk, dict) + and _is_audit_recursion_metadata({"tags": _parsed_chunk_tags(chunk)}, chunk.get("content")) + ): + empty = {"query": query, "total": 0, "results": []} + return ([TextContent(type="text", text="No results found.")], empty) return await _context(chunk_id=chunk_id, before=before, after=after) if file_path is not None and _query_has_regression_signal(query): diff --git a/src/brainlayer/search_repo.py b/src/brainlayer/search_repo.py index 98028fa3..63739192 100644 --- a/src/brainlayer/search_repo.py +++ b/src/brainlayer/search_repo.py @@ -156,10 +156,14 @@ def _audit_recursion_exclusion_sql(chunk_id_expr: str, tags_expr: str, *, use_ch content_expr = ( f"COALESCE(CAST({tags_expr.replace('.tags', '.content') if '.tags' in tags_expr else 'content'} AS TEXT), '')" ) + compact_content_expr = ( + f"REPLACE(REPLACE(REPLACE(REPLACE(LOWER({content_expr}), ' ', ''), " + "char(9), ''), char(10), ''), char(13), '')" + ) recursive_content_filter = ( f"LTRIM({content_expr}) NOT LIKE '┌─ brain_search:%' " f"AND LOWER({content_expr}) NOT LIKE '%mcp brainlayer memory: invalid json-rpc message%' " - f"AND REPLACE(LOWER({content_expr}), ' ', '') NOT LIKE '%\"jsonrpc\":\"2.0\"%'" + f"AND {compact_content_expr} NOT LIKE '%\"jsonrpc\":\"2.0\"%'" ) return f"({tag_filter} AND {recursive_content_filter})" @@ -207,20 +211,42 @@ def _audit_recursion_exclusion_sql(self, chunk_id_expr: str, tags_expr: str) -> ) def _audit_recursion_count(self) -> int: - try: - row = ( - self._read_cursor() - .execute( - f""" - SELECT COUNT(*) FROM chunks - WHERE NOT ({self._audit_recursion_exclusion_sql("id", "tags")}) - """ + cached_count = getattr(self, "_audit_recursion_count_cache", None) + current_data_version = self._checkpoint_cache_data_version() + cached_data_version = getattr(self, "_audit_recursion_count_cache_data_version", None) + if cached_count is not None and (current_data_version is None or cached_data_version == current_data_version): + return int(cached_count) + + for attempt in range(3): + try: + row = ( + self._read_cursor() + .execute( + f""" + SELECT COUNT(*) FROM chunks + WHERE NOT ({self._audit_recursion_exclusion_sql("id", "tags")}) + """ + ) + .fetchone() ) - .fetchone() - ) - return int(row[0]) if row else 0 - except (apsw.Error, TypeError, IndexError, ValueError): - return 0 + audit_count = int(row[0]) if row else 0 + setattr(self, "_audit_recursion_count_cache", audit_count) + setattr(self, "_audit_recursion_count_cache_data_version", current_data_version) + return audit_count + except apsw.Error as exc: + if _is_sqlite_busy_error(exc) and attempt < 2: + time.sleep(0.05 * (2**attempt)) + continue + if cached_count is not None: + return int(cached_count) + return 0 + except (TypeError, IndexError, ValueError): + if cached_count is not None: + return int(cached_count) + return 0 + if cached_count is not None: + return int(cached_count) + return 0 def _checkpoint_exclusion_clause(self, table_alias: str | None = None) -> str | None: if not getattr(self, "_has_chunk_origin", True): diff --git a/tests/test_audit_recursion_filter.py b/tests/test_audit_recursion_filter.py index 7aa825e9..be3cfa29 100644 --- a/tests/test_audit_recursion_filter.py +++ b/tests/test_audit_recursion_filter.py @@ -1,5 +1,7 @@ import json +import apsw + from brainlayer._helpers import serialize_f32 from brainlayer.engine import recall, think from brainlayer.vector_store import VectorStore @@ -172,6 +174,36 @@ def test_hybrid_search_overfetches_when_audit_chunks_dominate_knn(tmp_path): store.close() +def test_audit_recursion_count_uses_cached_value_on_busy_retry_exhaustion(tmp_path, monkeypatch): + store = VectorStore(tmp_path / "audit-filter-count-cache.db") + try: + query_embedding = [0.065] * 1024 + _insert_chunk( + store, + "audit-count-source", + "audit count source", + ["audit"], + query_embedding, + ) + + assert store._audit_recursion_count() == 1 + + attempts = 0 + + def busy_read_cursor(): + nonlocal attempts + attempts += 1 + raise apsw.BusyError("database is locked") + + monkeypatch.setattr(store, "_checkpoint_cache_data_version", lambda: 999) + monkeypatch.setattr(store, "_read_cursor", busy_read_cursor) + + assert store._audit_recursion_count() == 1 + assert attempts == 3 + finally: + store.close() + + def test_exact_r0x_tag_is_filtered_as_audit_shorthand(tmp_path): store = VectorStore(tmp_path / "audit-filter-r0x.db") try: @@ -255,6 +287,41 @@ def test_recursive_mcp_output_content_is_filtered_even_without_audit_tags(tmp_pa store.close() +def test_formatted_jsonrpc_content_is_filtered_by_sql_paths(tmp_path): + store = VectorStore(tmp_path / "recursive-formatted-jsonrpc-filter.db") + try: + query_embedding = [0.09] * 1024 + _insert_chunk( + store, + "recursive-formatted-jsonrpc-output", + 'MCP output payload: {"jsonrpc" :\n\t"2.0", "id": 24}', + ["auto-detected"], + query_embedding, + ) + _insert_chunk( + store, + "ordinary-formatted-jsonrpc-control", + "BrainLayer MCP guard should keep ordinary results visible", + ["brainlayer", "mcp"], + [0.091] * 1024, + ) + store.build_binary_index() + + vector_results = store.search(query_embedding=query_embedding, n_results=5) + hybrid_results = store.hybrid_search( + query_embedding=query_embedding, + query_text="BrainLayer MCP guard", + n_results=5, + ) + + assert "recursive-formatted-jsonrpc-output" not in vector_results["ids"][0] + assert "ordinary-formatted-jsonrpc-control" in vector_results["ids"][0] + assert "recursive-formatted-jsonrpc-output" not in hybrid_results["ids"][0] + assert "ordinary-formatted-jsonrpc-control" in hybrid_results["ids"][0] + finally: + store.close() + + def test_engine_think_and_recall_forward_include_audit(): class MockStore: def __init__(self): diff --git a/tests/test_ingest_guard.py b/tests/test_ingest_guard.py index 5d91fb29..a7f18445 100644 --- a/tests/test_ingest_guard.py +++ b/tests/test_ingest_guard.py @@ -33,41 +33,35 @@ def test_watcher_postchunk_rejects_jsonrpc_mcp_memory_output(): def test_direct_store_rejects_recursive_mcp_output(tmp_path): - store = VectorStore(tmp_path / "store-guard.db") - - with pytest.raises(ValueError, match="recursive MCP output"): - store_memory( - store=store, - embed_fn=None, - content=JSONRPC_RECURSION_CONTENT, - memory_type="note", - project="brainlayer", - tags=["correction:factual", "auto-detected"], - ) - - store.close() + with VectorStore(tmp_path / "store-guard.db") as store: + with pytest.raises(ValueError, match="recursive MCP output"): + store_memory( + store=store, + embed_fn=None, + content=JSONRPC_RECURSION_CONTENT, + memory_type="note", + project="brainlayer", + tags=["correction:factual", "auto-detected"], + ) def test_vector_upsert_rejects_recursive_mcp_output(tmp_path): - store = VectorStore(tmp_path / "upsert-guard.db") - - with pytest.raises(ValueError, match="recursive MCP output"): - store.upsert_chunks( - [ - { - "id": "rt-guarded-jsonrpc", - "content": JSONRPC_RECURSION_CONTENT, - "metadata": {}, - "source_file": "session.jsonl", - "project": "brainlayer", - "content_type": "assistant_text", - "char_count": len(JSONRPC_RECURSION_CONTENT), - } - ], - [[0.1] * 1024], - ) - - store.close() + with VectorStore(tmp_path / "upsert-guard.db") as store: + with pytest.raises(ValueError, match="recursive MCP output"): + store.upsert_chunks( + [ + { + "id": "rt-guarded-jsonrpc", + "content": JSONRPC_RECURSION_CONTENT, + "metadata": {}, + "source_file": "session.jsonl", + "project": "brainlayer", + "content_type": "assistant_text", + "char_count": len(JSONRPC_RECURSION_CONTENT), + } + ], + [[0.1] * 1024], + ) def test_drain_drops_recursive_mcp_output_events(tmp_path, monkeypatch): diff --git a/tests/test_search_exact_chunk_id.py b/tests/test_search_exact_chunk_id.py index 44995b0b..4088bffb 100644 --- a/tests/test_search_exact_chunk_id.py +++ b/tests/test_search_exact_chunk_id.py @@ -110,6 +110,23 @@ def test_exact_chunk_lookup_skips_lifecycle_managed_chunks(): assert structured["total"] == 0 +def test_exact_chunk_lookup_defers_when_unhandled_filters_are_active(): + """Exact lookup should fall through so the normal search path can evaluate filters.""" + mock_store = MagicMock() + mock_store.get_chunk.return_value = { + "id": "brainbar-filter01", + "content": "Filterable chunk", + "project": "brainlayer", + "source": "claude_code", + "intent": "decision", + "tags": '["correction:factual"]', + } + + result = _exact_chunk_lookup_result("brainbar-filter01", mock_store, "compact", source="claude_code") + + assert result is None + + @pytest.mark.asyncio async def test_brain_search_chunk_id_context_routing_wins_over_exact_lookup(): """Explicit chunk_id context expansion should run before exact-id short-circuiting.""" @@ -137,6 +154,55 @@ async def test_brain_search_chunk_id_context_routing_wins_over_exact_lookup(): context_mock.assert_awaited_once_with(chunk_id=chunk_id, before=3, after=3) +@pytest.mark.asyncio +async def test_brain_search_chunk_id_context_blocks_audit_recursion_by_default(): + chunk_id = "rt-33abe108-recursive" + mock_store = MagicMock() + mock_store.get_chunk.return_value = { + "id": chunk_id, + "content": 'MCP BrainLayer Memory: Invalid JSON-RPC message: {"jsonrpc":"2.0"}', + "source_file": "session.jsonl", + "project": "brainlayer", + "tags": '["correction:factual", "auto-detected"]', + "created_at": "2026-05-16T09:15:00Z", + } + + with ( + patch("brainlayer.mcp.search_handler._get_vector_store", return_value=mock_store), + patch( + "brainlayer.mcp.search_handler._context", + new=AsyncMock(side_effect=AssertionError("audit-recursive chunk_id must not route to context")), + ), + ): + content, structured = await _brain_search(query="ignored", chunk_id=chunk_id, detail="compact") + + assert "No results found." in content[0].text + assert structured == {"query": "ignored", "total": 0, "results": []} + + +@pytest.mark.asyncio +async def test_brain_search_chunk_id_context_allows_audit_when_requested(): + chunk_id = "rt-33abe108-recursive" + mock_store = MagicMock() + mock_store.get_chunk.return_value = { + "id": chunk_id, + "content": 'MCP BrainLayer Memory: Invalid JSON-RPC message: {"jsonrpc":"2.0"}', + "source_file": "session.jsonl", + "project": "brainlayer", + "tags": '["correction:factual", "auto-detected"]', + "created_at": "2026-05-16T09:15:00Z", + } + + with ( + patch("brainlayer.mcp.search_handler._get_vector_store", return_value=mock_store), + patch("brainlayer.mcp.search_handler._context", new=AsyncMock(return_value=["context window"])) as context_mock, + ): + result = await _brain_search(query="ignored", chunk_id=chunk_id, detail="compact", include_audit=True) + + assert result == ["context window"] + context_mock.assert_awaited_once_with(chunk_id=chunk_id, before=3, after=3) + + @pytest.mark.asyncio async def test_brain_search_exact_chunk_id_respects_project_scope(): """Exact chunk-id bypass must not leak chunks outside the active project scope.""" From b54c8aefb50f1158965d6ad852ba50a0f463652e Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 16:39:49 +0300 Subject: [PATCH 03/15] style: format audit recursion filter --- src/brainlayer/search_repo.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/brainlayer/search_repo.py b/src/brainlayer/search_repo.py index 63739192..400f39e4 100644 --- a/src/brainlayer/search_repo.py +++ b/src/brainlayer/search_repo.py @@ -157,13 +157,12 @@ def _audit_recursion_exclusion_sql(chunk_id_expr: str, tags_expr: str, *, use_ch f"COALESCE(CAST({tags_expr.replace('.tags', '.content') if '.tags' in tags_expr else 'content'} AS TEXT), '')" ) compact_content_expr = ( - f"REPLACE(REPLACE(REPLACE(REPLACE(LOWER({content_expr}), ' ', ''), " - "char(9), ''), char(10), ''), char(13), '')" + f"REPLACE(REPLACE(REPLACE(REPLACE(LOWER({content_expr}), ' ', ''), char(9), ''), char(10), ''), char(13), '')" ) recursive_content_filter = ( f"LTRIM({content_expr}) NOT LIKE '┌─ brain_search:%' " f"AND LOWER({content_expr}) NOT LIKE '%mcp brainlayer memory: invalid json-rpc message%' " - f"AND {compact_content_expr} NOT LIKE '%\"jsonrpc\":\"2.0\"%'" + f'AND {compact_content_expr} NOT LIKE \'%"jsonrpc":"2.0"%\'' ) return f"({tag_filter} AND {recursive_content_filter})" From 585cd9d208375f363f812285732a482725be49f2 Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 16:45:49 +0300 Subject: [PATCH 04/15] fix: size audit overfetch and invalidate cache --- src/brainlayer/search_repo.py | 30 ++++++++++----- src/brainlayer/store.py | 2 + src/brainlayer/vector_store.py | 18 +++++++-- tests/test_audit_recursion_filter.py | 55 ++++++++++++++++++++++++++++ 4 files changed, 92 insertions(+), 13 deletions(-) diff --git a/src/brainlayer/search_repo.py b/src/brainlayer/search_repo.py index 400f39e4..5fc1e93d 100644 --- a/src/brainlayer/search_repo.py +++ b/src/brainlayer/search_repo.py @@ -307,11 +307,26 @@ def _checkpoint_filtered_knn_k(self, n_results: int, include_checkpoints: bool) return n_results return n_results + checkpoint_count - def _effective_knn_k(self, n_results: int, needs_overfetch: bool, include_checkpoints: bool) -> int: + def _audit_filtered_knn_k(self, n_results: int, include_audit: bool) -> int: + if include_audit: + return n_results + audit_count = self._audit_recursion_count() + if audit_count <= 0: + return n_results + return n_results + audit_count + + def _effective_knn_k( + self, + n_results: int, + needs_overfetch: bool, + include_checkpoints: bool, + include_audit: bool, + ) -> int: effective_k = n_results if needs_overfetch: effective_k = max(effective_k, min(n_results * 10, 1000)) - return self._checkpoint_filtered_knn_k(effective_k, include_checkpoints) + effective_k = self._checkpoint_filtered_knn_k(effective_k, include_checkpoints) + return self._audit_filtered_knn_k(effective_k, include_audit) def _load_chunk_embeddings(self, chunk_ids: List[str]) -> Dict[str, np.ndarray]: """Fetch float embeddings for the provided chunk IDs.""" @@ -574,9 +589,8 @@ def search( or (source_filter and source_filter != "claude_code") or source_filter_like or correction_category - or (not include_audit and self._audit_recursion_count() > 0) ) - effective_k = self._effective_knn_k(n_results, bool(needs_overfetch), include_checkpoints) + effective_k = self._effective_knn_k(n_results, bool(needs_overfetch), include_checkpoints, include_audit) params = [query_bytes, effective_k] + filter_params query = f""" SELECT c.id, c.content, c.metadata, c.source_file, c.project, @@ -918,13 +932,9 @@ def _binary_search( where_sql = "AND " + " AND ".join(where_clauses) needs_overfetch = ( - entity_id - or (source_filter and source_filter != "claude_code") - or source_filter_like - or correction_category - or (not include_audit and self._audit_recursion_count() > 0) + entity_id or (source_filter and source_filter != "claude_code") or source_filter_like or correction_category ) - effective_k = self._effective_knn_k(n_results, bool(needs_overfetch), include_checkpoints) + effective_k = self._effective_knn_k(n_results, bool(needs_overfetch), include_checkpoints, include_audit) params = [query_bytes, effective_k] + filter_params results = list( cursor.execute( diff --git a/src/brainlayer/store.py b/src/brainlayer/store.py index b25ae2db..90165459 100644 --- a/src/brainlayer/store.py +++ b/src/brainlayer/store.py @@ -286,6 +286,7 @@ def store_memory( from .search_repo import clear_hybrid_search_cache clear_hybrid_search_cache(getattr(store, "db_path", None)) + store._invalidate_audit_recursion_count_cache() if chunk_origin == CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT: store._invalidate_checkpoint_count_cache() @@ -342,6 +343,7 @@ def embed_pending_chunks( from .search_repo import clear_hybrid_search_cache clear_hybrid_search_cache(getattr(store, "db_path", None)) + store._invalidate_audit_recursion_count_cache() return count diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index e638da0b..47dd34ab 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -92,6 +92,8 @@ def __init__(self, db_path: Path): self._retrieval_strengthening_lock = threading.Lock() self._checkpoint_count_cache: int | None = None self._checkpoint_count_cache_data_version: int | None = None + self._audit_recursion_count_cache: int | None = None + self._audit_recursion_count_cache_data_version: int | None = None self._readonly = self.db_path.exists() and not os.access(self.db_path, os.W_OK) if self._readonly: self._init_readonly_db() @@ -121,6 +123,14 @@ def _invalidate_checkpoint_count_cache(self) -> None: self._checkpoint_count_cache = None self._checkpoint_count_cache_data_version = None + def _invalidate_audit_recursion_count_cache(self) -> None: + self._audit_recursion_count_cache = None + self._audit_recursion_count_cache_data_version = None + + def _invalidate_filtered_count_caches(self) -> None: + self._invalidate_checkpoint_count_cache() + self._invalidate_audit_recursion_count_cache() + def _checkpoint_wal_full(self, cursor) -> None: try: cursor.execute("PRAGMA wal_checkpoint(FULL)") @@ -301,7 +311,7 @@ def _init_db(self) -> None: ), ) self._checkpoint_wal_full(cursor) - self._invalidate_checkpoint_count_cache() + self._invalidate_filtered_count_caches() cursor.execute(""" UPDATE chunks @@ -1500,7 +1510,7 @@ def upsert_chunks(self, chunks: List[Dict[str, Any]], embeddings: List[List[floa from .search_repo import clear_hybrid_search_cache clear_hybrid_search_cache(getattr(self, "db_path", None)) - self._invalidate_checkpoint_count_cache() + self._invalidate_filtered_count_caches() return len(chunks) @@ -1565,7 +1575,7 @@ def update_chunk( from .search_repo import clear_hybrid_search_cache clear_hybrid_search_cache(getattr(self, "db_path", None)) - self._invalidate_checkpoint_count_cache() + self._invalidate_filtered_count_caches() return True def archive_chunk(self, chunk_id: str) -> bool: @@ -1585,6 +1595,7 @@ def archive_chunk(self, chunk_id: str) -> bool: from .search_repo import clear_hybrid_search_cache clear_hybrid_search_cache(getattr(self, "db_path", None)) + self._invalidate_filtered_count_caches() return True def supersede_chunk(self, old_chunk_id: str, new_chunk_id: str) -> bool: @@ -1604,6 +1615,7 @@ def supersede_chunk(self, old_chunk_id: str, new_chunk_id: str) -> bool: from .search_repo import clear_hybrid_search_cache clear_hybrid_search_cache(getattr(self, "db_path", None)) + self._invalidate_filtered_count_caches() return True def get_chunk(self, chunk_id: str) -> Optional[Dict[str, Any]]: diff --git a/tests/test_audit_recursion_filter.py b/tests/test_audit_recursion_filter.py index be3cfa29..4c8bdee6 100644 --- a/tests/test_audit_recursion_filter.py +++ b/tests/test_audit_recursion_filter.py @@ -204,6 +204,61 @@ def busy_read_cursor(): store.close() +def test_audit_overfetch_scales_with_filtered_row_count(tmp_path, monkeypatch): + store = VectorStore(tmp_path / "audit-filter-overfetch-size.db") + try: + monkeypatch.setattr(store, "_audit_recursion_count", lambda: 1500) + + assert store._effective_knn_k(3, needs_overfetch=False, include_checkpoints=True, include_audit=False) == 1503 + finally: + store.close() + + +def test_audit_count_cache_invalidates_after_same_connection_upsert(tmp_path): + store = VectorStore(tmp_path / "audit-filter-cache-invalidation.db") + try: + query_embedding = [0.066] * 1024 + + assert store._audit_recursion_count() == 0 + assert store._audit_recursion_count_cache == 0 + + audit_chunks = [ + { + "id": f"audit-cache-neighbor-{index}", + "content": f"audit cache neighbor {index}", + "metadata": {}, + "source_file": "audit-cache.jsonl", + "project": "brainlayer", + "content_type": "assistant_text", + "source": "claude_code", + "char_count": len(f"audit cache neighbor {index}"), + "tags": ["audit"], + } + for index in range(30) + ] + normal_content = "ordinary same connection audit cache invalidation target" + normal_chunk = { + "id": "ordinary-after-same-connection-audit-cache", + "content": normal_content, + "metadata": {}, + "source_file": "audit-cache.jsonl", + "project": "brainlayer", + "content_type": "assistant_text", + "source": "claude_code", + "char_count": len(normal_content), + "tags": ["brainbar"], + } + store.upsert_chunks(audit_chunks + [normal_chunk], [query_embedding] * 30 + [[0.067] * 1024]) + + assert store._audit_recursion_count_cache is None + + results = store.search(query_embedding=query_embedding, n_results=3) + assert "ordinary-after-same-connection-audit-cache" in results["ids"][0] + assert all(not chunk_id.startswith("audit-cache-neighbor-") for chunk_id in results["ids"][0]) + finally: + store.close() + + def test_exact_r0x_tag_is_filtered_as_audit_shorthand(tmp_path): store = VectorStore(tmp_path / "audit-filter-r0x.db") try: From c73f0138b36969dbd878e95ac43bec6900a3ef49 Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 17:00:12 +0300 Subject: [PATCH 05/15] fix: filter audit kg facts --- src/brainlayer/kg_repo.py | 22 +++++- src/brainlayer/mcp/search_handler.py | 32 ++++++-- src/brainlayer/search_repo.py | 28 ++++--- tests/test_audit_recursion_filter.py | 114 +++++++++++++++++++++++++++ 4 files changed, 177 insertions(+), 19 deletions(-) diff --git a/src/brainlayer/kg_repo.py b/src/brainlayer/kg_repo.py index b4f8be74..95d61cb5 100644 --- a/src/brainlayer/kg_repo.py +++ b/src/brainlayer/kg_repo.py @@ -982,6 +982,7 @@ def kg_search( relation_type: Optional[str] = None, limit: int = 20, include_checkpoints: bool = False, + include_audit: bool = False, ) -> List[Dict[str, Any]]: """Structured KG fact retrieval. Excludes co_occurs_with noise.""" results: List[Dict[str, Any]] = [] @@ -990,11 +991,16 @@ def kg_search( if entity: cursor = self._read_cursor() - checkpoint_join = "" + source_chunk_join = "" checkpoint_filter = "" + audit_filter = "" checkpoint_params: list[str] = [] + needs_source_chunk = ( + not include_checkpoints and getattr(self, "_has_chunk_origin", True) + ) or not include_audit + if needs_source_chunk: + source_chunk_join = "LEFT JOIN chunks source_chunk ON r.source_chunk_id = source_chunk.id" if not include_checkpoints and getattr(self, "_has_chunk_origin", True): - checkpoint_join = "LEFT JOIN chunks source_chunk ON r.source_chunk_id = source_chunk.id" checkpoint_filter = """ AND ( r.source_chunk_id IS NULL @@ -1003,6 +1009,14 @@ def kg_search( ) """ checkpoint_params.append("precompact_checkpoint") + if not include_audit: + audit_filter = f""" + AND ( + r.source_chunk_id IS NULL + OR source_chunk.id IS NULL + OR {self._audit_recursion_exclusion_sql("source_chunk.id", "source_chunk.tags", "source_chunk.content")} + ) + """ if relation_type: type_filter_src = "AND r.relation_type = ?" @@ -1024,10 +1038,11 @@ def kg_search( FROM kg_current_facts r JOIN kg_entities se ON r.source_id = se.id JOIN kg_entities te ON r.target_id = te.id - {checkpoint_join} + {source_chunk_join} WHERE ((r.source_id = ? {type_filter_src}) OR (r.target_id = ? {type_filter_tgt})) {checkpoint_filter} + {audit_filter} ORDER BY r.importance DESC, r.confidence DESC LIMIT ? """, @@ -1088,6 +1103,7 @@ def kg_hybrid_search( relation_type=relation_type, limit=n_results, include_checkpoints=bool(kwargs.get("include_checkpoints", False)), + include_audit=bool(kwargs.get("include_audit", False)), ) scored_facts = [] diff --git a/src/brainlayer/mcp/search_handler.py b/src/brainlayer/mcp/search_handler.py index 4e0d3c60..ee351a66 100644 --- a/src/brainlayer/mcp/search_handler.py +++ b/src/brainlayer/mcp/search_handler.py @@ -343,7 +343,13 @@ def _detect_entities(query: str, store: Any) -> list[dict]: return [] -def _kg_facts_sql(store: Any, entity_name: str, *, include_checkpoints: bool = False) -> list[dict]: +def _kg_facts_sql( + store: Any, + entity_name: str, + *, + include_checkpoints: bool = False, + include_audit: bool = False, +) -> list[dict]: """Pure SQL KG fact lookup — no embeddings, no vector search. Returns typed relations for an entity, excluding co_occurs_with noise. @@ -363,11 +369,16 @@ def _kg_facts_sql(store: Any, entity_name: str, *, include_checkpoints: bool = F entity_id = row[0][0] - checkpoint_join = "" + source_chunk_join = "" checkpoint_filter = "" + audit_filter = "" params: list[Any] = [entity_id, entity_id] + needs_source_chunk = ( + not include_checkpoints and getattr(store, "_has_chunk_origin", True) + ) or not include_audit + if needs_source_chunk: + source_chunk_join = "LEFT JOIN chunks source_chunk ON r.source_chunk_id = source_chunk.id" if not include_checkpoints and getattr(store, "_has_chunk_origin", True): - checkpoint_join = "LEFT JOIN chunks source_chunk ON r.source_chunk_id = source_chunk.id" checkpoint_filter = """ AND ( r.source_chunk_id IS NULL @@ -376,6 +387,14 @@ def _kg_facts_sql(store: Any, entity_name: str, *, include_checkpoints: bool = F ) """ params.append(CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT) + if not include_audit: + audit_filter = f""" + AND ( + r.source_chunk_id IS NULL + OR source_chunk.id IS NULL + OR {store._audit_recursion_exclusion_sql("source_chunk.id", "source_chunk.tags", "source_chunk.content")} + ) + """ # Get all semantic relations (exclude co_occurs_with) facts_raw = list( @@ -386,10 +405,11 @@ def _kg_facts_sql(store: Any, entity_name: str, *, include_checkpoints: bool = F FROM kg_relations r JOIN kg_entities se ON r.source_id = se.id JOIN kg_entities te ON r.target_id = te.id - {checkpoint_join} + {source_chunk_join} WHERE (r.source_id = ? OR r.target_id = ?) AND r.relation_type != 'co_occurs_with' {checkpoint_filter} + {audit_filter} ORDER BY r.confidence DESC LIMIT 20""", params, @@ -634,7 +654,9 @@ async def _brain_search( entity_name = detected_entities[0]["name"] # Path 1: Pure SQL KG facts (no embedding model needed, always runs) - fact_items = _kg_facts_sql(store, entity_name, include_checkpoints=include_checkpoints) + fact_items = _kg_facts_sql( + store, entity_name, include_checkpoints=include_checkpoints, include_audit=include_audit + ) # Path 2: Try full hybrid search (embedding + vector + KG) structured_results = [] diff --git a/src/brainlayer/search_repo.py b/src/brainlayer/search_repo.py index 5fc1e93d..6ca9e1be 100644 --- a/src/brainlayer/search_repo.py +++ b/src/brainlayer/search_repo.py @@ -135,7 +135,13 @@ def _audit_recursion_tag_predicate(tag_expr: str) -> str: return "(" + " OR ".join(pattern.format(tag_expr=lowered) for pattern in AUDIT_RECURSION_TAG_PATTERNS) + ")" -def _audit_recursion_exclusion_sql(chunk_id_expr: str, tags_expr: str, *, use_chunk_tags: bool = True) -> str: +def _audit_recursion_exclusion_sql( + chunk_id_expr: str, + tags_expr: str, + *, + content_expr: str = "content", + use_chunk_tags: bool = True, +) -> str: if use_chunk_tags: tag_filter = ( "NOT EXISTS (" @@ -153,14 +159,13 @@ def _audit_recursion_exclusion_sql(chunk_id_expr: str, tags_expr: str, *, use_ch ")" ) - content_expr = ( - f"COALESCE(CAST({tags_expr.replace('.tags', '.content') if '.tags' in tags_expr else 'content'} AS TEXT), '')" - ) + content_expr = f"COALESCE(CAST({content_expr} AS TEXT), '')" compact_content_expr = ( f"REPLACE(REPLACE(REPLACE(REPLACE(LOWER({content_expr}), ' ', ''), char(9), ''), char(10), ''), char(13), '')" ) + trimmed_content_expr = f"LTRIM({content_expr}, char(9) || char(10) || char(11) || char(12) || char(13) || char(32))" recursive_content_filter = ( - f"LTRIM({content_expr}) NOT LIKE '┌─ brain_search:%' " + f"LOWER({trimmed_content_expr}) NOT LIKE '┌─ brain_search:%' " f"AND LOWER({content_expr}) NOT LIKE '%mcp brainlayer memory: invalid json-rpc message%' " f'AND {compact_content_expr} NOT LIKE \'%"jsonrpc":"2.0"%\'' ) @@ -202,10 +207,11 @@ def _precompact_content_exclusion_sql(content_expr: str) -> str: class SearchMixin: """Search and query methods, mixed into VectorStore.""" - def _audit_recursion_exclusion_sql(self, chunk_id_expr: str, tags_expr: str) -> str: + def _audit_recursion_exclusion_sql(self, chunk_id_expr: str, tags_expr: str, content_expr: str) -> str: return _audit_recursion_exclusion_sql( chunk_id_expr, tags_expr, + content_expr=content_expr, use_chunk_tags=getattr(self, "_chunk_tags_available", True), ) @@ -223,7 +229,7 @@ def _audit_recursion_count(self) -> int: .execute( f""" SELECT COUNT(*) FROM chunks - WHERE NOT ({self._audit_recursion_exclusion_sql("id", "tags")}) + WHERE NOT ({self._audit_recursion_exclusion_sql("id", "tags", "content")}) """ ) .fetchone() @@ -566,7 +572,7 @@ def search( where_clauses.append("c.id IN (SELECT chunk_id FROM chunk_tags WHERE tag LIKE ?)") filter_params.append(f"correction:{correction_category}%") if not include_audit: - where_clauses.append(self._audit_recursion_exclusion_sql("c.id", "c.tags")) + where_clauses.append(self._audit_recursion_exclusion_sql("c.id", "c.tags", "c.content")) if not include_checkpoints: checkpoint_clause = self._checkpoint_exclusion_clause("c") if checkpoint_clause: @@ -652,7 +658,7 @@ def search( where_clauses.append("id IN (SELECT chunk_id FROM chunk_tags WHERE tag LIKE ?)") params.append(f"correction:{correction_category}%") if not include_audit: - where_clauses.append(self._audit_recursion_exclusion_sql("id", "tags")) + where_clauses.append(self._audit_recursion_exclusion_sql("id", "tags", "content")) if not include_checkpoints: checkpoint_clause = self._checkpoint_exclusion_clause() if checkpoint_clause: @@ -917,7 +923,7 @@ def _binary_search( where_clauses.append("c.id IN (SELECT chunk_id FROM chunk_tags WHERE tag LIKE ?)") filter_params.append(f"correction:{correction_category}%") if not include_audit: - where_clauses.append(self._audit_recursion_exclusion_sql("c.id", "c.tags")) + where_clauses.append(self._audit_recursion_exclusion_sql("c.id", "c.tags", "c.content")) if not include_checkpoints: checkpoint_clause = self._checkpoint_exclusion_clause("c") if checkpoint_clause: @@ -1234,7 +1240,7 @@ def hybrid_search( fts_extra.append("AND c.id IN (SELECT chunk_id FROM chunk_tags WHERE tag LIKE ?)") fts_filter_params.append(f"correction:{correction_category}%") if not include_audit: - fts_extra.append(f"AND {self._audit_recursion_exclusion_sql('c.id', 'c.tags')}") + fts_extra.append(f"AND {self._audit_recursion_exclusion_sql('c.id', 'c.tags', 'c.content')}") if not include_checkpoints: checkpoint_clause = self._checkpoint_exclusion_clause("c") if checkpoint_clause: diff --git a/tests/test_audit_recursion_filter.py b/tests/test_audit_recursion_filter.py index 4c8bdee6..729163e4 100644 --- a/tests/test_audit_recursion_filter.py +++ b/tests/test_audit_recursion_filter.py @@ -4,6 +4,8 @@ from brainlayer._helpers import serialize_f32 from brainlayer.engine import recall, think +from brainlayer.mcp.search_handler import _kg_facts_sql +from brainlayer.search_repo import _audit_recursion_exclusion_sql from brainlayer.vector_store import VectorStore @@ -377,6 +379,118 @@ def test_formatted_jsonrpc_content_is_filtered_by_sql_paths(tmp_path): store.close() +def test_brain_search_box_with_leading_non_space_whitespace_is_filtered_by_sql_paths(tmp_path): + store = VectorStore(tmp_path / "recursive-leading-whitespace-filter.db") + try: + query_embedding = [0.092] * 1024 + _insert_chunk( + store, + "recursive-leading-whitespace-box", + '\n\t┌─ brain_search: "BrainLayer audit recursion"\n│ recursive output', + ["auto-detected"], + query_embedding, + ) + _insert_chunk( + store, + "ordinary-leading-whitespace-control", + "BrainLayer MCP guard should still return ordinary memories", + ["brainlayer", "mcp"], + [0.093] * 1024, + ) + store.build_binary_index() + + vector_results = store.search(query_embedding=query_embedding, n_results=5) + hybrid_results = store.hybrid_search( + query_embedding=query_embedding, + query_text="BrainLayer MCP guard", + n_results=5, + ) + + assert "recursive-leading-whitespace-box" not in vector_results["ids"][0] + assert "ordinary-leading-whitespace-control" in vector_results["ids"][0] + assert "recursive-leading-whitespace-box" not in hybrid_results["ids"][0] + assert "ordinary-leading-whitespace-control" in hybrid_results["ids"][0] + finally: + store.close() + + +def test_audit_recursion_sql_accepts_explicit_content_expression(): + clause = _audit_recursion_exclusion_sql( + "c.id", + "audit_tags.tags", + content_expr="c.content", + use_chunk_tags=False, + ) + + assert "c.content" in clause + assert "audit_tags.content" not in clause + + +def test_kg_facts_exclude_audit_sourced_relations_by_default(tmp_path): + store = VectorStore(tmp_path / "audit-filter-kg-facts.db") + try: + query_embedding = [0.094] * 1024 + _insert_chunk( + store, + "audit-fact-chunk", + "Etan stores recursive audit output", + ["audit"], + query_embedding, + ) + _insert_chunk( + store, + "normal-fact-chunk", + "Etan stores durable memory", + ["brainlayer"], + [0.095] * 1024, + ) + store.upsert_entity("person-etan", "person", "Etan") + store.upsert_entity("project-audit", "project", "Audit Project") + store.upsert_entity("project-normal", "project", "Normal Project") + store.add_relation( + "rel-audit", + "person-etan", + "project-audit", + "mentions", + fact="audit-sourced fact", + source_chunk_id="audit-fact-chunk", + ) + store.add_relation( + "rel-normal", + "person-etan", + "project-normal", + "maintains", + fact="normal fact", + source_chunk_id="normal-fact-chunk", + ) + + sql_default_facts = _kg_facts_sql(store, "Etan") + sql_audit_facts = _kg_facts_sql(store, "Etan", include_audit=True) + hybrid_default = store.kg_hybrid_search( + query_embedding=query_embedding, + query_text="Etan", + n_results=10, + entity_name="Etan", + ) + hybrid_with_audit = store.kg_hybrid_search( + query_embedding=query_embedding, + query_text="Etan", + n_results=10, + entity_name="Etan", + include_audit=True, + ) + + assert {fact["target"] for fact in sql_default_facts} == {"Normal Project"} + assert {fact["target"] for fact in sql_audit_facts} == {"Audit Project", "Normal Project"} + assert {fact["target_entity"]["name"] for fact in hybrid_default["facts"]} == {"Normal Project"} + assert {fact["target_entity"]["name"] for fact in hybrid_with_audit["facts"]} == { + "Audit Project", + "Normal Project", + } + finally: + store.close() + + def test_engine_think_and_recall_forward_include_audit(): class MockStore: def __init__(self): From c7b18695d51a517a28b6b3fafae0f35eacdb1e66 Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 17:12:45 +0300 Subject: [PATCH 06/15] fix: filter audit context windows --- src/brainlayer/kg_repo.py | 6 +-- src/brainlayer/mcp/search_handler.py | 31 +++++++++--- src/brainlayer/search_repo.py | 57 +++++++++++++++++++-- tests/test_audit_recursion_filter.py | 71 ++++++++++++++++++++++++++- tests/test_phase6_critical.py | 8 ++- tests/test_precompact_chunk_origin.py | 63 ++++++++++++++++++++++++ tests/test_search_exact_chunk_id.py | 16 +++++- 7 files changed, 234 insertions(+), 18 deletions(-) diff --git a/src/brainlayer/kg_repo.py b/src/brainlayer/kg_repo.py index 95d61cb5..34beb0b9 100644 --- a/src/brainlayer/kg_repo.py +++ b/src/brainlayer/kg_repo.py @@ -1001,14 +1001,14 @@ def kg_search( if needs_source_chunk: source_chunk_join = "LEFT JOIN chunks source_chunk ON r.source_chunk_id = source_chunk.id" if not include_checkpoints and getattr(self, "_has_chunk_origin", True): - checkpoint_filter = """ + checkpoint_clause = self._checkpoint_exclusion_clause("source_chunk") + checkpoint_filter = f""" AND ( r.source_chunk_id IS NULL OR source_chunk.id IS NULL - OR COALESCE(source_chunk.chunk_origin, 'unknown') != ? + OR ({checkpoint_clause}) ) """ - checkpoint_params.append("precompact_checkpoint") if not include_audit: audit_filter = f""" AND ( diff --git a/src/brainlayer/mcp/search_handler.py b/src/brainlayer/mcp/search_handler.py index ee351a66..de139a5e 100644 --- a/src/brainlayer/mcp/search_handler.py +++ b/src/brainlayer/mcp/search_handler.py @@ -379,14 +379,14 @@ def _kg_facts_sql( if needs_source_chunk: source_chunk_join = "LEFT JOIN chunks source_chunk ON r.source_chunk_id = source_chunk.id" if not include_checkpoints and getattr(store, "_has_chunk_origin", True): - checkpoint_filter = """ + checkpoint_clause = store._checkpoint_exclusion_clause("source_chunk") + checkpoint_filter = f""" AND ( r.source_chunk_id IS NULL OR source_chunk.id IS NULL - OR COALESCE(source_chunk.chunk_origin, 'unknown') != ? + OR ({checkpoint_clause}) ) """ - params.append(CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT) if not include_audit: audit_filter = f""" AND ( @@ -524,7 +524,13 @@ async def _brain_search( ): empty = {"query": query, "total": 0, "results": []} return ([TextContent(type="text", text="No results found.")], empty) - return await _context(chunk_id=chunk_id, before=before, after=after) + return await _context( + chunk_id=chunk_id, + before=before, + after=after, + include_checkpoints=include_checkpoints, + include_audit=include_audit, + ) if file_path is not None and _query_has_regression_signal(query): regression_result = await _regression(file_path=file_path, project=project) @@ -1281,11 +1287,24 @@ async def _list_projects() -> list[TextContent]: return _error_result(f"Error listing projects: {str(e)}") -async def _context(chunk_id: str, before: int = 3, after: int = 3) -> list[TextContent]: +async def _context( + chunk_id: str, + before: int = 3, + after: int = 3, + *, + include_checkpoints: bool = False, + include_audit: bool = False, +) -> list[TextContent]: """Get surrounding conversation context for a chunk.""" try: store = _get_vector_store() - result = store.get_context(chunk_id, before=before, after=after) + result = store.get_context( + chunk_id, + before=before, + after=after, + include_checkpoints=include_checkpoints, + include_audit=include_audit, + ) if result.get("error"): return _error_result(f"Unknown chunk_id '{chunk_id[:20]}...'. Use chunk_id from brainlayer_search results.") if not result.get("context"): diff --git a/src/brainlayer/search_repo.py b/src/brainlayer/search_repo.py index 6ca9e1be..276f8cfc 100644 --- a/src/brainlayer/search_repo.py +++ b/src/brainlayer/search_repo.py @@ -15,7 +15,7 @@ import numpy as np from ._helpers import _escape_fts5_query, _is_sqlite_busy_error, serialize_f32 -from .chunk_origin import CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT +from .chunk_origin import CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT, is_precompact_checkpoint_content from .dedupe import resolve_chunk_id from .ingest_guard import recursive_mcp_output_reason @@ -1495,7 +1495,38 @@ def _ingest_keyword_rows(rows: list[tuple], ranks: dict[str, int]) -> None: return result - def get_context(self, chunk_id: str, before: int = 3, after: int = 3) -> Dict[str, Any]: + def _context_chunk_is_filtered( + self, + *, + content: str | None, + tags: Any, + chunk_origin: str | None, + include_checkpoints: bool, + include_audit: bool, + ) -> bool: + if not include_checkpoints and ( + chunk_origin == CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT or is_precompact_checkpoint_content(content) + ): + return True + if include_audit: + return False + parsed_tags = tags + if isinstance(tags, str): + try: + parsed_tags = json.loads(tags) + except (json.JSONDecodeError, TypeError): + parsed_tags = [] + return _is_audit_recursion_metadata({"tags": parsed_tags if isinstance(parsed_tags, list) else []}, content) + + def get_context( + self, + chunk_id: str, + before: int = 3, + after: int = 3, + *, + include_checkpoints: bool = False, + include_audit: bool = False, + ) -> Dict[str, Any]: """Get surrounding chunks from the same conversation.""" read_conn = self._get_read_conn() cursor = read_conn.cursor() @@ -1505,7 +1536,7 @@ def get_context(self, chunk_id: str, before: int = 3, after: int = 3) -> Dict[st target = list( cursor.execute( """ - SELECT conversation_id, position, content, metadata, content_type + SELECT conversation_id, position, content, metadata, content_type, tags, chunk_origin FROM chunks WHERE id = ? """, (chunk_id,), @@ -1515,7 +1546,15 @@ def get_context(self, chunk_id: str, before: int = 3, after: int = 3) -> Dict[st if not target: return {"target": None, "context": [], "error": "Chunk not found"} - conv_id, position, content, metadata, content_type = target[0] + conv_id, position, content, metadata, content_type, tags, chunk_origin = target[0] + if self._context_chunk_is_filtered( + content=content, + tags=tags, + chunk_origin=chunk_origin, + include_checkpoints=include_checkpoints, + include_audit=include_audit, + ): + return {"target": None, "context": [], "error": "Chunk not found"} if not conv_id or position is None: # Standalone chunks (for example, manual-* chunks created via brain_store) @@ -1538,7 +1577,7 @@ def get_context(self, chunk_id: str, before: int = 3, after: int = 3) -> Dict[st context_rows = list( cursor.execute( """ - SELECT id, content, position, content_type + SELECT id, content, position, content_type, tags, chunk_origin FROM chunks WHERE conversation_id = ? AND position BETWEEN ? AND ? @@ -1550,6 +1589,14 @@ def get_context(self, chunk_id: str, before: int = 3, after: int = 3) -> Dict[st context = [] for row in context_rows: + if self._context_chunk_is_filtered( + content=row[1], + tags=row[4], + chunk_origin=row[5], + include_checkpoints=include_checkpoints, + include_audit=include_audit, + ): + continue context.append( { "id": row[0], diff --git a/tests/test_audit_recursion_filter.py b/tests/test_audit_recursion_filter.py index 729163e4..5ac88e42 100644 --- a/tests/test_audit_recursion_filter.py +++ b/tests/test_audit_recursion_filter.py @@ -1,10 +1,11 @@ import json import apsw +import pytest from brainlayer._helpers import serialize_f32 from brainlayer.engine import recall, think -from brainlayer.mcp.search_handler import _kg_facts_sql +from brainlayer.mcp.search_handler import _brain_search, _kg_facts_sql from brainlayer.search_repo import _audit_recursion_exclusion_sql from brainlayer.vector_store import VectorStore @@ -28,6 +29,29 @@ def _insert_chunk(store: VectorStore, chunk_id: str, content: str, tags: list[st ) +def _insert_context_chunk( + store: VectorStore, + *, + chunk_id: str, + content: str, + tags: list[str], + position: int, +) -> None: + cursor = store.conn.cursor() + cursor.execute( + """INSERT INTO chunks ( + id, content, metadata, source_file, project, content_type, + char_count, source, tags, conversation_id, position + ) VALUES (?, ?, '{}', 'audit-context.jsonl', 'brainlayer', + 'assistant_text', ?, 'claude_code', ?, 'audit-context-session', ?)""", + (chunk_id, content, len(content), json.dumps(tags), position), + ) + cursor.executemany( + "INSERT OR IGNORE INTO chunk_tags (chunk_id, tag) VALUES (?, ?)", + [(chunk_id, tag) for tag in tags], + ) + + def test_hybrid_search_excludes_audit_recursion_by_default(tmp_path): store = VectorStore(tmp_path / "audit-filter.db") try: @@ -491,6 +515,51 @@ def test_kg_facts_exclude_audit_sourced_relations_by_default(tmp_path): store.close() +@pytest.mark.asyncio +async def test_chunk_context_filters_audit_neighbors_by_default(tmp_path, monkeypatch): + store = VectorStore(tmp_path / "audit-context-filter.db") + try: + recursive_content = 'MCP BrainLayer Memory: Invalid JSON-RPC message: {"jsonrpc":"2.0"}' + _insert_context_chunk( + store, + chunk_id="audit-context-neighbor", + content=recursive_content, + tags=["audit"], + position=1, + ) + _insert_context_chunk( + store, + chunk_id="normal-context-target", + content="Normal context target", + tags=["brainlayer"], + position=2, + ) + _insert_context_chunk( + store, + chunk_id="normal-context-neighbor", + content="Normal context neighbor", + tags=["brainlayer"], + position=3, + ) + monkeypatch.setattr("brainlayer.mcp.search_handler._get_vector_store", lambda: store) + + default_content = await _brain_search(query="ignored", chunk_id="normal-context-target", before=1, after=1) + audit_content = await _brain_search( + query="ignored", + chunk_id="normal-context-target", + before=1, + after=1, + include_audit=True, + ) + + assert recursive_content not in default_content[0].text + assert "Normal context target" in default_content[0].text + assert "Normal context neighbor" in default_content[0].text + assert recursive_content in audit_content[0].text + finally: + store.close() + + def test_engine_think_and_recall_forward_include_audit(): class MockStore: def __init__(self): diff --git a/tests/test_phase6_critical.py b/tests/test_phase6_critical.py index c232e96f..2ebf5609 100644 --- a/tests/test_phase6_critical.py +++ b/tests/test_phase6_critical.py @@ -153,7 +153,13 @@ def test_search_routing_chunk_id(self): from brainlayer.mcp.search_handler import _brain_search asyncio.run(_brain_search(query="expand this", chunk_id="test-chunk-001")) - mock_context.assert_called_once_with(chunk_id="test-chunk-001", before=3, after=3) + mock_context.assert_called_once_with( + chunk_id="test-chunk-001", + before=3, + after=3, + include_checkpoints=False, + include_audit=False, + ) def test_search_routing_file_path(self): """file_path parameter → routes to file timeline + recall.""" diff --git a/tests/test_precompact_chunk_origin.py b/tests/test_precompact_chunk_origin.py index 3857f631..eeabc686 100644 --- a/tests/test_precompact_chunk_origin.py +++ b/tests/test_precompact_chunk_origin.py @@ -748,6 +748,69 @@ def test_kg_hybrid_search_facts_excludes_checkpoint_sourced_relations_by_default } +def test_kg_facts_exclude_legacy_checkpoint_content_when_origin_unknown(tmp_path): + store = VectorStore(tmp_path / "kg-legacy-checkpoint-content-facts.db") + _insert_chunk( + store, + chunk_id="legacy-checkpoint-content-fact-chunk", + content="[PreCompact checkpoint]\nEtan builds legacy checkpoint-only project", + chunk_origin=CHUNK_ORIGIN_UNKNOWN, + ) + _insert_chunk( + store, + chunk_id="normal-legacy-control-fact-chunk", + content="Etan builds durable legacy control project", + chunk_origin=CHUNK_ORIGIN_UNKNOWN, + ) + store.upsert_entity("person-etan", "person", "Etan") + store.upsert_entity("project-legacy-checkpoint", "project", "Legacy Checkpoint Project") + store.upsert_entity("project-legacy-normal", "project", "Legacy Normal Project") + store.add_relation( + "rel-legacy-checkpoint-content", + "person-etan", + "project-legacy-checkpoint", + "builds", + fact="legacy checkpoint-only fact", + source_chunk_id="legacy-checkpoint-content-fact-chunk", + ) + store.add_relation( + "rel-legacy-normal", + "person-etan", + "project-legacy-normal", + "maintains", + fact="legacy normal fact", + source_chunk_id="normal-legacy-control-fact-chunk", + ) + + sql_default_facts = _kg_facts_sql(store, "Etan") + sql_checkpoint_facts = _kg_facts_sql(store, "Etan", include_checkpoints=True) + hybrid_default = store.kg_hybrid_search( + query_embedding=_embed("Etan"), + query_text="Etan", + n_results=10, + entity_name="Etan", + ) + hybrid_with_checkpoints = store.kg_hybrid_search( + query_embedding=_embed("Etan"), + query_text="Etan", + n_results=10, + entity_name="Etan", + include_checkpoints=True, + ) + store.close() + + assert {fact["target"] for fact in sql_default_facts} == {"Legacy Normal Project"} + assert {fact["target"] for fact in sql_checkpoint_facts} == { + "Legacy Checkpoint Project", + "Legacy Normal Project", + } + assert {fact["target_entity"]["name"] for fact in hybrid_default["facts"]} == {"Legacy Normal Project"} + assert {fact["target_entity"]["name"] for fact in hybrid_with_checkpoints["facts"]} == { + "Legacy Checkpoint Project", + "Legacy Normal Project", + } + + def test_binary_search_overfetches_when_checkpoint_filter_discards_nearest_neighbors(tmp_path): store = VectorStore(tmp_path / "binary-overfetch.db") query_embedding = [1.0] + ([0.0] * 1023) diff --git a/tests/test_search_exact_chunk_id.py b/tests/test_search_exact_chunk_id.py index 4088bffb..e9d7cf5b 100644 --- a/tests/test_search_exact_chunk_id.py +++ b/tests/test_search_exact_chunk_id.py @@ -151,7 +151,13 @@ async def test_brain_search_chunk_id_context_routing_wins_over_exact_lookup(): result = await _brain_search(query=chunk_id, chunk_id=chunk_id, detail="compact") assert result == ["context window"] - context_mock.assert_awaited_once_with(chunk_id=chunk_id, before=3, after=3) + context_mock.assert_awaited_once_with( + chunk_id=chunk_id, + before=3, + after=3, + include_checkpoints=False, + include_audit=False, + ) @pytest.mark.asyncio @@ -200,7 +206,13 @@ async def test_brain_search_chunk_id_context_allows_audit_when_requested(): result = await _brain_search(query="ignored", chunk_id=chunk_id, detail="compact", include_audit=True) assert result == ["context window"] - context_mock.assert_awaited_once_with(chunk_id=chunk_id, before=3, after=3) + context_mock.assert_awaited_once_with( + chunk_id=chunk_id, + before=3, + after=3, + include_checkpoints=False, + include_audit=True, + ) @pytest.mark.asyncio From 7ae7c7e578957acc1459a1293225ef766facbd1f Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 17:23:25 +0300 Subject: [PATCH 07/15] fix: normalize checkpoint SQL whitespace --- src/brainlayer/search_repo.py | 3 ++- tests/test_precompact_chunk_origin.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/src/brainlayer/search_repo.py b/src/brainlayer/search_repo.py index 276f8cfc..bea8cd52 100644 --- a/src/brainlayer/search_repo.py +++ b/src/brainlayer/search_repo.py @@ -190,7 +190,8 @@ def _is_audit_recursion_metadata(meta: dict, content: str | None = None) -> bool def _precompact_content_exclusion_sql(content_expr: str) -> str: - normalized = f"LOWER(LTRIM(COALESCE(CAST({content_expr} AS TEXT), '')))" + whitespace_chars = "char(9) || char(10) || char(11) || char(12) || char(13) || char(32)" + normalized = f"LOWER(LTRIM(COALESCE(CAST({content_expr} AS TEXT), ''), {whitespace_chars}))" prefix = f"SUBSTR({normalized}, 1, 1024)" return ( f"{normalized} NOT LIKE '[precompact checkpoint]%' " diff --git a/tests/test_precompact_chunk_origin.py b/tests/test_precompact_chunk_origin.py index eeabc686..37e22ce6 100644 --- a/tests/test_precompact_chunk_origin.py +++ b/tests/test_precompact_chunk_origin.py @@ -344,7 +344,7 @@ def test_search_excludes_checkpoint_content_even_when_origin_backfill_missed_it( _insert_chunk( store, chunk_id="missed-origin-checkpoint", - content="[PreCompact checkpoint]\nCurrent task: should not leak through default search", + content="\t\n[PreCompact checkpoint]\nCurrent task: should not leak through default search", chunk_origin=CHUNK_ORIGIN_UNKNOWN, ) _insert_chunk( From 80563c0af33ccb91fe24d4a953172721808f99f9 Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 17:33:12 +0300 Subject: [PATCH 08/15] fix: guard recursive chunk updates --- src/brainlayer/vector_store.py | 1 + tests/test_ingest_guard.py | 23 +++++++++++++++++++++++ 2 files changed, 24 insertions(+) diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index 47dd34ab..3e18347b 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -1529,6 +1529,7 @@ def update_chunk( return False if content is not None: + reject_recursive_mcp_output(content) created_at_row = cursor.execute("SELECT created_at FROM chunks WHERE id = ?", (chunk_id,)).fetchone() dedupe_fields = compute_dedupe_fields(content, created_at_row[0] if created_at_row else None) cursor.execute( diff --git a/tests/test_ingest_guard.py b/tests/test_ingest_guard.py index a7f18445..a7a629bc 100644 --- a/tests/test_ingest_guard.py +++ b/tests/test_ingest_guard.py @@ -64,6 +64,29 @@ def test_vector_upsert_rejects_recursive_mcp_output(tmp_path): ) +def test_update_chunk_rejects_recursive_mcp_output(tmp_path): + with VectorStore(tmp_path / "update-guard.db") as store: + store.upsert_chunks( + [ + { + "id": "safe-update-target", + "content": "ordinary memory before attempted recursive update", + "metadata": {}, + "source_file": "session.jsonl", + "project": "brainlayer", + "content_type": "assistant_text", + "char_count": 47, + } + ], + [[0.2] * 1024], + ) + + with pytest.raises(ValueError, match="recursive MCP output"): + store.update_chunk("safe-update-target", content=JSONRPC_RECURSION_CONTENT) + + assert store.get_chunk("safe-update-target")["content"] == "ordinary memory before attempted recursive update" + + def test_drain_drops_recursive_mcp_output_events(tmp_path, monkeypatch): db_path = tmp_path / "drain-guard.db" queue_dir = tmp_path / "queue" From e1b8d1e4e5b027734b8cda548f0dd4a6d6aff026 Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 17:53:55 +0300 Subject: [PATCH 09/15] fix: harden audit recursion filters --- src/brainlayer/ingest_guard.py | 6 +++++ src/brainlayer/search_repo.py | 10 +++++++-- tests/test_audit_recursion_filter.py | 33 ++++++++++++++++++++++++++++ tests/test_ingest_guard.py | 6 +++++ 4 files changed, 53 insertions(+), 2 deletions(-) diff --git a/src/brainlayer/ingest_guard.py b/src/brainlayer/ingest_guard.py index 8270d3f5..2042d1c8 100644 --- a/src/brainlayer/ingest_guard.py +++ b/src/brainlayer/ingest_guard.py @@ -7,6 +7,10 @@ _JSONRPC_MESSAGE_RE = re.compile(r'"jsonrpc"\s*:\s*"2\.0"', re.IGNORECASE) _INVALID_JSONRPC_MARKER = "mcp brainlayer memory: invalid json-rpc message" _BRAIN_SEARCH_BOX_PREFIX = "┌─ brain_search:" +_BRAINLAYER_BOX_PREFIX_RE = re.compile( + r"^┌─\s*(?:brain_[a-z_]+|entity(?:\s+search)?):", + re.IGNORECASE, +) def recursive_mcp_output_reason(content: str | None) -> str | None: @@ -17,6 +21,8 @@ def recursive_mcp_output_reason(content: str | None) -> str | None: stripped = str(content).lstrip() if stripped.startswith(_BRAIN_SEARCH_BOX_PREFIX): return "brain_search_output" + if _BRAINLAYER_BOX_PREFIX_RE.match(stripped): + return "brainlayer_mcp_output" folded = stripped.casefold() if _INVALID_JSONRPC_MARKER in folded: diff --git a/src/brainlayer/search_repo.py b/src/brainlayer/search_repo.py index bea8cd52..2ec98f5b 100644 --- a/src/brainlayer/search_repo.py +++ b/src/brainlayer/search_repo.py @@ -42,7 +42,10 @@ ] META_NOISE_PATTERNS_CASEFOLDED = [pattern.casefold() for pattern in META_NOISE_PATTERNS] AUDIT_RECURSION_TAG_PATTERNS = ( - "{tag_expr} LIKE '%audit%'", + "{tag_expr} = 'audit'", + "{tag_expr} = 'audit-recursion'", + "{tag_expr} = 'audit_recursion'", + "{tag_expr} = 'meta-research'", "{tag_expr} = 'r0x'", "{tag_expr} = 'r02'", "{tag_expr} GLOB 'r0[0-9]'", @@ -166,6 +169,9 @@ def _audit_recursion_exclusion_sql( trimmed_content_expr = f"LTRIM({content_expr}, char(9) || char(10) || char(11) || char(12) || char(13) || char(32))" recursive_content_filter = ( f"LOWER({trimmed_content_expr}) NOT LIKE '┌─ brain_search:%' " + f"AND LOWER({trimmed_content_expr}) NOT GLOB '┌─ brain_*:*' " + f"AND LOWER({trimmed_content_expr}) NOT LIKE '┌─ entity:%' " + f"AND LOWER({trimmed_content_expr}) NOT LIKE '┌─ entity search:%' " f"AND LOWER({content_expr}) NOT LIKE '%mcp brainlayer memory: invalid json-rpc message%' " f'AND {compact_content_expr} NOT LIKE \'%"jsonrpc":"2.0"%\'' ) @@ -180,7 +186,7 @@ def _is_audit_recursion_metadata(meta: dict, content: str | None = None) -> bool return False for tag in tags: normalized = str(tag).casefold() - if "audit" in normalized: + if normalized in {"audit", "audit-recursion", "audit_recursion", "meta-research"}: return True if normalized in {"r02", "r0x"}: return True diff --git a/tests/test_audit_recursion_filter.py b/tests/test_audit_recursion_filter.py index 5ac88e42..3854df96 100644 --- a/tests/test_audit_recursion_filter.py +++ b/tests/test_audit_recursion_filter.py @@ -118,6 +118,30 @@ def test_hybrid_search_does_not_exclude_r0x_substrings_inside_normal_tags(tmp_pa store.close() +def test_hybrid_search_does_not_exclude_normal_audit_word_tags(tmp_path): + store = VectorStore(tmp_path / "audit-filter-normal-audit-tag.db") + try: + query_embedding = [0.031] * 1024 + _insert_chunk( + store, + "ordinary-security-audit-memory", + "security audit finding with real operational content should remain searchable", + ["security-audit", "reliability"], + query_embedding, + ) + store.build_binary_index() + + results = store.hybrid_search( + query_embedding=query_embedding, + query_text="security audit operational content", + n_results=3, + ) + + assert "ordinary-security-audit-memory" in results["ids"][0] + finally: + store.close() + + def test_readonly_legacy_db_without_chunk_tags_uses_json_tag_fallback(tmp_path): db_path = tmp_path / "legacy-readonly-audit-filter.db" store = VectorStore(db_path) @@ -335,6 +359,13 @@ def test_recursive_mcp_output_content_is_filtered_even_without_audit_tags(tmp_pa ["auto-detected"], query_embedding, ) + _insert_chunk( + store, + "recursive-entity-search-box", + "┌─ Entity search: BrainLayer\n│ recursive entity output", + ["auto-detected"], + query_embedding, + ) _insert_chunk( store, "ordinary-mcp-memory", @@ -354,6 +385,7 @@ def test_recursive_mcp_output_content_is_filtered_even_without_audit_tags(tmp_pa assert "ordinary-mcp-memory" in ids assert "recursive-jsonrpc-output" not in ids assert "recursive-brain-search-box" not in ids + assert "recursive-entity-search-box" not in ids audit_results = store.hybrid_search( query_embedding=query_embedding, @@ -364,6 +396,7 @@ def test_recursive_mcp_output_content_is_filtered_even_without_audit_tags(tmp_pa assert "recursive-jsonrpc-output" in audit_results["ids"][0] assert "recursive-brain-search-box" in audit_results["ids"][0] + assert "recursive-entity-search-box" in audit_results["ids"][0] finally: store.close() diff --git a/tests/test_ingest_guard.py b/tests/test_ingest_guard.py index a7a629bc..6b159bf1 100644 --- a/tests/test_ingest_guard.py +++ b/tests/test_ingest_guard.py @@ -28,6 +28,12 @@ def test_watcher_preclassify_rejects_brain_search_output_box(): assert should_skip_entry(entry) == "recursive_mcp_output" +def test_watcher_preclassify_rejects_other_brainlayer_mcp_output_boxes(): + entry = _make_entry("┌─ Entity search: Etan\n│ recursive entity output\n└─") + + assert should_skip_entry(entry) == "recursive_mcp_output" + + def test_watcher_postchunk_rejects_jsonrpc_mcp_memory_output(): assert should_skip_chunk_content(JSONRPC_RECURSION_CONTENT) == "recursive_mcp_output" From c57418c44925aacb0b1038fbfd110bcb92850150 Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 18:07:50 +0300 Subject: [PATCH 10/15] fix: avoid unnecessary audit cache invalidation --- src/brainlayer/store.py | 1 - 1 file changed, 1 deletion(-) diff --git a/src/brainlayer/store.py b/src/brainlayer/store.py index 90165459..7476d394 100644 --- a/src/brainlayer/store.py +++ b/src/brainlayer/store.py @@ -343,7 +343,6 @@ def embed_pending_chunks( from .search_repo import clear_hybrid_search_cache clear_hybrid_search_cache(getattr(store, "db_path", None)) - store._invalidate_audit_recursion_count_cache() return count From 12f5c167f864170479cf3ad736f7f88d12bad36c Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 18:20:44 +0300 Subject: [PATCH 11/15] fix: skip rejected chunks before upsert transaction --- src/brainlayer/vector_store.py | 17 ++++++++++++++--- tests/test_ingest_guard.py | 31 +++++++++++++++++++++++++++++++ 2 files changed, 45 insertions(+), 3 deletions(-) diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index 3e18347b..33722ac5 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -1363,15 +1363,26 @@ def upsert_chunks(self, chunks: List[Dict[str, Any]], embeddings: List[List[floa if len(chunks) != len(embeddings): raise ValueError("Chunks and embeddings must have same length") + valid_pairs: list[tuple[Dict[str, Any], List[float]]] = [] + rejected_error: ValueError | None = None + for chunk, embedding in zip(chunks, embeddings): + try: + reject_recursive_mcp_output(chunk.get("content")) + except ValueError as exc: + rejected_error = rejected_error or exc + continue + valid_pairs.append((chunk, embedding)) + if not valid_pairs and rejected_error is not None: + raise rejected_error + for attempt in range(5): cursor = self.conn.cursor() transaction_started = False try: cursor.execute("BEGIN IMMEDIATE") transaction_started = True - for chunk, embedding in zip(chunks, embeddings): + for chunk, embedding in valid_pairs: chunk_id = chunk["id"] - reject_recursive_mcp_output(chunk.get("content")) created_at = chunk.get("created_at") tags_value = chunk.get("tags") tags_json = json.dumps(tags_value) if isinstance(tags_value, (list, dict)) else tags_value @@ -1512,7 +1523,7 @@ def upsert_chunks(self, chunks: List[Dict[str, Any]], embeddings: List[List[floa clear_hybrid_search_cache(getattr(self, "db_path", None)) self._invalidate_filtered_count_caches() - return len(chunks) + return len(valid_pairs) def update_chunk( self, diff --git a/tests/test_ingest_guard.py b/tests/test_ingest_guard.py index 6b159bf1..4a088387 100644 --- a/tests/test_ingest_guard.py +++ b/tests/test_ingest_guard.py @@ -70,6 +70,37 @@ def test_vector_upsert_rejects_recursive_mcp_output(tmp_path): ) +def test_vector_upsert_skips_recursive_mcp_output_without_rolling_back_valid_siblings(tmp_path): + with VectorStore(tmp_path / "upsert-mixed-guard.db") as store: + count = store.upsert_chunks( + [ + { + "id": "valid-sibling", + "content": "valid operational memory beside recursive output", + "metadata": {}, + "source_file": "session.jsonl", + "project": "brainlayer", + "content_type": "assistant_text", + "char_count": 48, + }, + { + "id": "recursive-sibling", + "content": JSONRPC_RECURSION_CONTENT, + "metadata": {}, + "source_file": "session.jsonl", + "project": "brainlayer", + "content_type": "assistant_text", + "char_count": len(JSONRPC_RECURSION_CONTENT), + }, + ], + [[0.1] * 1024, [0.2] * 1024], + ) + + assert count == 1 + assert store.get_chunk("valid-sibling")["content"] == "valid operational memory beside recursive output" + assert store.get_chunk("recursive-sibling") is None + + def test_update_chunk_rejects_recursive_mcp_output(tmp_path): with VectorStore(tmp_path / "update-guard.db") as store: store.upsert_chunks( From 36dda5f6969d2284b390e96d870252497456bf65 Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 18:38:54 +0300 Subject: [PATCH 12/15] fix: use literal recursive search SQL pattern --- src/brainlayer/search_repo.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/brainlayer/search_repo.py b/src/brainlayer/search_repo.py index 2ec98f5b..aaa68b2a 100644 --- a/src/brainlayer/search_repo.py +++ b/src/brainlayer/search_repo.py @@ -168,7 +168,7 @@ def _audit_recursion_exclusion_sql( ) trimmed_content_expr = f"LTRIM({content_expr}, char(9) || char(10) || char(11) || char(12) || char(13) || char(32))" recursive_content_filter = ( - f"LOWER({trimmed_content_expr}) NOT LIKE '┌─ brain_search:%' " + f"LOWER({trimmed_content_expr}) NOT GLOB '┌─ brain_search:*' " f"AND LOWER({trimmed_content_expr}) NOT GLOB '┌─ brain_*:*' " f"AND LOWER({trimmed_content_expr}) NOT LIKE '┌─ entity:%' " f"AND LOWER({trimmed_content_expr}) NOT LIKE '┌─ entity search:%' " From 3a0d745f8cf8c925d78a35a97f0d707011b16e34 Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 18:51:59 +0300 Subject: [PATCH 13/15] fix: cap filtered knn overfetch --- src/brainlayer/search_repo.py | 5 +++-- tests/test_audit_recursion_filter.py | 2 ++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/src/brainlayer/search_repo.py b/src/brainlayer/search_repo.py index aaa68b2a..62997d6a 100644 --- a/src/brainlayer/search_repo.py +++ b/src/brainlayer/search_repo.py @@ -32,6 +32,7 @@ _HYBRID_CACHE_MAX = 128 # max entries (LRU eviction) _MMR_CANDIDATE_LIMIT = 50 _MMR_LAMBDA = 0.65 +_FILTERED_KNN_MAX = 2000 META_NOISE_PATTERNS = [ "brain_search(", "brain_entity(", @@ -318,7 +319,7 @@ def _checkpoint_filtered_knn_k(self, n_results: int, include_checkpoints: bool) if checkpoint_count <= 0: return n_results - return n_results + checkpoint_count + return min(n_results + checkpoint_count, max(n_results, _FILTERED_KNN_MAX)) def _audit_filtered_knn_k(self, n_results: int, include_audit: bool) -> int: if include_audit: @@ -326,7 +327,7 @@ def _audit_filtered_knn_k(self, n_results: int, include_audit: bool) -> int: audit_count = self._audit_recursion_count() if audit_count <= 0: return n_results - return n_results + audit_count + return min(n_results + audit_count, max(n_results, _FILTERED_KNN_MAX)) def _effective_knn_k( self, diff --git a/tests/test_audit_recursion_filter.py b/tests/test_audit_recursion_filter.py index 3854df96..201f6490 100644 --- a/tests/test_audit_recursion_filter.py +++ b/tests/test_audit_recursion_filter.py @@ -260,6 +260,8 @@ def test_audit_overfetch_scales_with_filtered_row_count(tmp_path, monkeypatch): monkeypatch.setattr(store, "_audit_recursion_count", lambda: 1500) assert store._effective_knn_k(3, needs_overfetch=False, include_checkpoints=True, include_audit=False) == 1503 + monkeypatch.setattr(store, "_audit_recursion_count", lambda: 5000) + assert store._effective_knn_k(3, needs_overfetch=False, include_checkpoints=True, include_audit=False) == 2000 finally: store.close() From e2accbc9409ee3ba5b598f7ca007c091f4547e77 Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 19:04:27 +0300 Subject: [PATCH 14/15] fix: retry filtered knn underfills --- src/brainlayer/search_repo.py | 58 +++++++++++++++++++++------ tests/test_precompact_chunk_origin.py | 2 +- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/src/brainlayer/search_repo.py b/src/brainlayer/search_repo.py index 62997d6a..a35701e4 100644 --- a/src/brainlayer/search_repo.py +++ b/src/brainlayer/search_repo.py @@ -282,7 +282,13 @@ def _checkpoint_cache_data_version(self) -> int | None: time.sleep(0.05 * (2**attempt)) return None - def _checkpoint_filtered_knn_k(self, n_results: int, include_checkpoints: bool) -> int: + def _checkpoint_filtered_knn_k( + self, + n_results: int, + include_checkpoints: bool, + *, + cap_filtered: bool = True, + ) -> int: if include_checkpoints or not getattr(self, "_has_chunk_origin", True): return n_results @@ -319,15 +325,21 @@ def _checkpoint_filtered_knn_k(self, n_results: int, include_checkpoints: bool) if checkpoint_count <= 0: return n_results - return min(n_results + checkpoint_count, max(n_results, _FILTERED_KNN_MAX)) + expanded_k = n_results + checkpoint_count + if not cap_filtered: + return expanded_k + return min(expanded_k, max(n_results, _FILTERED_KNN_MAX)) - def _audit_filtered_knn_k(self, n_results: int, include_audit: bool) -> int: + def _audit_filtered_knn_k(self, n_results: int, include_audit: bool, *, cap_filtered: bool = True) -> int: if include_audit: return n_results audit_count = self._audit_recursion_count() if audit_count <= 0: return n_results - return min(n_results + audit_count, max(n_results, _FILTERED_KNN_MAX)) + expanded_k = n_results + audit_count + if not cap_filtered: + return expanded_k + return min(expanded_k, max(n_results, _FILTERED_KNN_MAX)) def _effective_knn_k( self, @@ -335,12 +347,18 @@ def _effective_knn_k( needs_overfetch: bool, include_checkpoints: bool, include_audit: bool, + *, + cap_filtered: bool = True, ) -> int: effective_k = n_results if needs_overfetch: effective_k = max(effective_k, min(n_results * 10, 1000)) - effective_k = self._checkpoint_filtered_knn_k(effective_k, include_checkpoints) - return self._audit_filtered_knn_k(effective_k, include_audit) + effective_k = self._checkpoint_filtered_knn_k( + effective_k, + include_checkpoints, + cap_filtered=cap_filtered, + ) + return self._audit_filtered_knn_k(effective_k, include_audit, cap_filtered=cap_filtered) def _load_chunk_embeddings(self, chunk_ids: List[str]) -> Dict[str, np.ndarray]: """Fetch float embeddings for the provided chunk IDs.""" @@ -619,6 +637,16 @@ def search( """ results = list(cursor.execute(query, params)) + if len(results) < n_results: + retry_k = self._effective_knn_k( + n_results, + bool(needs_overfetch), + include_checkpoints, + include_audit, + cap_filtered=False, + ) + if retry_k > effective_k: + results = list(cursor.execute(query, [query_bytes, retry_k] + filter_params)) results = results[:n_results] elif query_text is not None: @@ -950,9 +978,7 @@ def _binary_search( ) effective_k = self._effective_knn_k(n_results, bool(needs_overfetch), include_checkpoints, include_audit) params = [query_bytes, effective_k] + filter_params - results = list( - cursor.execute( - f""" + query = f""" SELECT c.id, c.content, c.metadata, c.source_file, c.project, c.content_type, c.value_type, c.char_count, v.distance, @@ -962,10 +988,18 @@ def _binary_search( JOIN chunks c ON v.chunk_id = c.id WHERE v.embedding MATCH vec_quantize_binary(?) AND k = ? {where_sql} ORDER BY v.distance - """, - params, + """ + results = list(cursor.execute(query, params)) + if len(results) < n_results: + retry_k = self._effective_knn_k( + n_results, + bool(needs_overfetch), + include_checkpoints, + include_audit, + cap_filtered=False, ) - ) + if retry_k > effective_k: + results = list(cursor.execute(query, [query_bytes, retry_k] + filter_params)) results = results[:n_results] ids = [] diff --git a/tests/test_precompact_chunk_origin.py b/tests/test_precompact_chunk_origin.py index 37e22ce6..add73347 100644 --- a/tests/test_precompact_chunk_origin.py +++ b/tests/test_precompact_chunk_origin.py @@ -404,7 +404,7 @@ def test_vector_search_overfetches_when_checkpoint_filter_discards_nearest_neigh def test_vector_search_does_not_starve_normal_results_after_many_checkpoints(tmp_path): store = VectorStore(tmp_path / "vector-many-checkpoints.db") query_embedding = [0.0] * 1024 - for index in range(1001): + for index in range(2001): _insert_chunk( store, chunk_id=f"checkpoint-{index}", From 25cbf6dc3a85479b25feb3987719d9764882b42a Mon Sep 17 00:00:00 2001 From: Test User Date: Sat, 16 May 2026 19:24:25 +0300 Subject: [PATCH 15/15] fix: filter audit evidence from entity recall --- src/brainlayer/kg_repo.py | 16 +++++-- src/brainlayer/mcp/__init__.py | 6 +++ src/brainlayer/mcp/entity_handler.py | 2 + src/brainlayer/mcp/search_handler.py | 2 +- src/brainlayer/pipeline/digest.py | 3 +- src/brainlayer/search_repo.py | 12 ++--- tests/test_3tool_aliases.py | 15 +++++- tests/test_audit_recursion_filter.py | 69 ++++++++++++++++++++++++++++ 8 files changed, 112 insertions(+), 13 deletions(-) diff --git a/src/brainlayer/kg_repo.py b/src/brainlayer/kg_repo.py index 34beb0b9..fbe4042a 100644 --- a/src/brainlayer/kg_repo.py +++ b/src/brainlayer/kg_repo.py @@ -480,17 +480,27 @@ def set_entity_parent(self, entity_id: str, parent_id: str) -> None: (parent_id, entity_id), ) - def get_entity_chunks(self, entity_id: str, limit: int = 20) -> List[Dict[str, Any]]: + def get_entity_chunks( + self, + entity_id: str, + limit: int = 20, + *, + include_audit: bool = False, + ) -> List[Dict[str, Any]]: """Get chunks linked to an entity, ordered by relevance.""" cursor = self._read_cursor() + where_clauses = ["ec.entity_id = ?"] + if not include_audit: + where_clauses.append(self._audit_recursion_exclusion_sql("c.id", "c.tags", "c.content")) + where_sql = " AND ".join(where_clauses) rows = list( cursor.execute( - """ + f""" SELECT ec.chunk_id, ec.relevance, ec.context, ec.mention_type, c.content, c.source_file, c.project, c.content_type, c.created_at FROM kg_entity_chunks ec JOIN chunks c ON ec.chunk_id = c.id - WHERE ec.entity_id = ? + WHERE {where_sql} ORDER BY ec.relevance DESC LIMIT ? """, diff --git a/src/brainlayer/mcp/__init__.py b/src/brainlayer/mcp/__init__.py index f45fe5a5..fa2fb01a 100644 --- a/src/brainlayer/mcp/__init__.py +++ b/src/brainlayer/mcp/__init__.py @@ -944,6 +944,11 @@ async def list_tools() -> list[Tool]: "minimum": 0, "description": "Pagination offset for list action.", }, + "include_audit": { + "type": "boolean", + "default": False, + "description": "Opt in to audit/eval and recursive MCP-output evidence. Defaults false to prevent audit-recursion pollution.", + }, }, "required": [], } @@ -1407,6 +1412,7 @@ async def call_tool(name: str, arguments: dict[str, Any]): mode="entity", query=query, entity_type=arguments.get("entity_type"), + include_audit=arguments.get("include_audit", False), ) ) diff --git a/src/brainlayer/mcp/entity_handler.py b/src/brainlayer/mcp/entity_handler.py index 2b71312b..ef32b563 100644 --- a/src/brainlayer/mcp/entity_handler.py +++ b/src/brainlayer/mcp/entity_handler.py @@ -16,6 +16,7 @@ async def _brain_entity( query: str, entity_type: str | None = None, + include_audit: bool = False, ) -> CallToolResult: """Handle brain_entity tool call.""" from ..pipeline.digest import entity_lookup @@ -32,6 +33,7 @@ async def _brain_entity( store=store, embed_fn=model.embed_query, entity_type=entity_type, + include_audit=include_audit, ), ) except Exception as e: diff --git a/src/brainlayer/mcp/search_handler.py b/src/brainlayer/mcp/search_handler.py index de139a5e..890c759d 100644 --- a/src/brainlayer/mcp/search_handler.py +++ b/src/brainlayer/mcp/search_handler.py @@ -1018,7 +1018,7 @@ async def _brain_recall( return _error_result("query is required for mode=entity") from .entity_handler import _brain_entity as _entity_handler - return await _entity_handler(query=query, entity_type=entity_type) + return await _entity_handler(query=query, entity_type=entity_type, include_audit=include_audit) # --- Original modes --- diff --git a/src/brainlayer/pipeline/digest.py b/src/brainlayer/pipeline/digest.py index ab63ee92..1ee3ed95 100644 --- a/src/brainlayer/pipeline/digest.py +++ b/src/brainlayer/pipeline/digest.py @@ -795,6 +795,7 @@ def entity_lookup( store: VectorStore, embed_fn: Callable[[str], List[float]], entity_type: Optional[str] = None, + include_audit: bool = False, ) -> Optional[Dict[str, Any]]: """Look up an entity by name, returning structured info with relations and evidence. @@ -838,7 +839,7 @@ def entity_lookup( ] # Get evidence chunks - evidence_raw = store.get_entity_chunks(entity_id, limit=10) + evidence_raw = store.get_entity_chunks(entity_id, limit=10, include_audit=include_audit) evidence = [ { "chunk_id": e["chunk_id"], diff --git a/src/brainlayer/search_repo.py b/src/brainlayer/search_repo.py index a35701e4..0842f431 100644 --- a/src/brainlayer/search_repo.py +++ b/src/brainlayer/search_repo.py @@ -165,14 +165,14 @@ def _audit_recursion_exclusion_sql( content_expr = f"COALESCE(CAST({content_expr} AS TEXT), '')" compact_content_expr = ( - f"REPLACE(REPLACE(REPLACE(REPLACE(LOWER({content_expr}), ' ', ''), char(9), ''), char(10), ''), char(13), '')" + f"REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(REPLACE(LOWER({content_expr}), ' ', ''), char(9), ''), " + "char(10), ''), char(11), ''), char(12), ''), char(13), '')" ) - trimmed_content_expr = f"LTRIM({content_expr}, char(9) || char(10) || char(11) || char(12) || char(13) || char(32))" recursive_content_filter = ( - f"LOWER({trimmed_content_expr}) NOT GLOB '┌─ brain_search:*' " - f"AND LOWER({trimmed_content_expr}) NOT GLOB '┌─ brain_*:*' " - f"AND LOWER({trimmed_content_expr}) NOT LIKE '┌─ entity:%' " - f"AND LOWER({trimmed_content_expr}) NOT LIKE '┌─ entity search:%' " + f"{compact_content_expr} NOT GLOB '┌─brain_search:*' " + f"AND {compact_content_expr} NOT GLOB '┌─brain_*:*' " + f"AND {compact_content_expr} NOT LIKE '┌─entity:%' " + f"AND {compact_content_expr} NOT LIKE '┌─entitysearch:%' " f"AND LOWER({content_expr}) NOT LIKE '%mcp brainlayer memory: invalid json-rpc message%' " f'AND {compact_content_expr} NOT LIKE \'%"jsonrpc":"2.0"%\'' ) diff --git a/tests/test_3tool_aliases.py b/tests/test_3tool_aliases.py index c4b42587..58872c49 100644 --- a/tests/test_3tool_aliases.py +++ b/tests/test_3tool_aliases.py @@ -143,7 +143,7 @@ def test_entity_mode_delegates(self): ) as mock_entity: asyncio.run(_brain_recall(mode="entity", query="BrainLayer")) - mock_entity.assert_called_once_with(query="BrainLayer", entity_type=None) + mock_entity.assert_called_once_with(query="BrainLayer", entity_type=None, include_audit=False) def test_entity_mode_passes_entity_type(self): """mode=entity passes entity_type filter.""" @@ -154,7 +154,18 @@ def test_entity_mode_passes_entity_type(self): ) as mock_entity: asyncio.run(_brain_recall(mode="entity", query="Etan", entity_type="person")) - mock_entity.assert_called_once_with(query="Etan", entity_type="person") + mock_entity.assert_called_once_with(query="Etan", entity_type="person", include_audit=False) + + def test_entity_mode_passes_include_audit(self): + """mode=entity passes audit opt-in to entity evidence lookup.""" + with patch( + "brainlayer.mcp.entity_handler._brain_entity", + new_callable=AsyncMock, + return_value=MagicMock(), + ) as mock_entity: + asyncio.run(_brain_recall(mode="entity", query="Etan", include_audit=True)) + + mock_entity.assert_called_once_with(query="Etan", entity_type=None, include_audit=True) def test_entity_mode_requires_query(self): """mode=entity without query returns error.""" diff --git a/tests/test_audit_recursion_filter.py b/tests/test_audit_recursion_filter.py index 201f6490..129c459f 100644 --- a/tests/test_audit_recursion_filter.py +++ b/tests/test_audit_recursion_filter.py @@ -473,6 +473,41 @@ def test_brain_search_box_with_leading_non_space_whitespace_is_filtered_by_sql_p store.close() +def test_brain_search_box_without_space_is_filtered_by_sql_paths(tmp_path): + store = VectorStore(tmp_path / "recursive-no-space-box-filter.db") + try: + query_embedding = [0.0935] * 1024 + _insert_chunk( + store, + "recursive-no-space-box", + '┌─brain_search: "BrainLayer audit recursion"\n│ recursive output', + ["auto-detected"], + query_embedding, + ) + _insert_chunk( + store, + "ordinary-no-space-control", + "BrainLayer MCP guard should keep ordinary memories visible", + ["brainlayer", "mcp"], + [0.0936] * 1024, + ) + store.build_binary_index() + + vector_results = store.search(query_embedding=query_embedding, n_results=5) + hybrid_results = store.hybrid_search( + query_embedding=query_embedding, + query_text="BrainLayer MCP guard", + n_results=5, + ) + + assert "recursive-no-space-box" not in vector_results["ids"][0] + assert "ordinary-no-space-control" in vector_results["ids"][0] + assert "recursive-no-space-box" not in hybrid_results["ids"][0] + assert "ordinary-no-space-control" in hybrid_results["ids"][0] + finally: + store.close() + + def test_audit_recursion_sql_accepts_explicit_content_expression(): clause = _audit_recursion_exclusion_sql( "c.id", @@ -550,6 +585,40 @@ def test_kg_facts_exclude_audit_sourced_relations_by_default(tmp_path): store.close() +def test_entity_chunks_exclude_audit_evidence_by_default(tmp_path): + store = VectorStore(tmp_path / "audit-filter-entity-evidence.db") + try: + query_embedding = [0.0945] * 1024 + _insert_chunk( + store, + "audit-entity-evidence", + '┌─brain_search: "Etan recursive evidence"\n│ recursive output', + ["auto-detected"], + query_embedding, + ) + _insert_chunk( + store, + "normal-entity-evidence", + "Etan owns a normal BrainLayer decision", + ["brainlayer"], + [0.0946] * 1024, + ) + store.upsert_entity("person-etan", "person", "Etan") + store.link_entity_chunk("person-etan", "audit-entity-evidence", relevance=1.0) + store.link_entity_chunk("person-etan", "normal-entity-evidence", relevance=0.9) + + default_chunks = store.get_entity_chunks("person-etan", limit=10) + audit_chunks = store.get_entity_chunks("person-etan", limit=10, include_audit=True) + + assert [chunk["chunk_id"] for chunk in default_chunks] == ["normal-entity-evidence"] + assert {chunk["chunk_id"] for chunk in audit_chunks} == { + "audit-entity-evidence", + "normal-entity-evidence", + } + finally: + store.close() + + @pytest.mark.asyncio async def test_chunk_context_filters_audit_neighbors_by_default(tmp_path, monkeypatch): store = VectorStore(tmp_path / "audit-context-filter.db")