From a324baeb52909d03bc7a516ef72a0a02f4ccda68 Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Mon, 25 May 2026 21:53:07 +0300 Subject: [PATCH 1/3] feat(search): add profile instrumentation --- .../BrainBar/HybridSearchHelperClient.swift | 38 ++++++++--- brain-bar/Sources/BrainBar/MCPRouter.swift | 18 +++++- .../Sources/BrainBar/QuickCapturePanel.swift | 47 +++++++++++++- .../BrainBar/SearchProfileLogger.swift | 64 +++++++++++++++++++ .../BrainBarDaemon/SearchProfileLogger.swift | 64 +++++++++++++++++++ ...-search-profile-instrumentation-mandate.md | 34 ++++++++++ src/brainlayer/brainbar_hybrid_helper.py | 51 ++++++++++++--- src/brainlayer/mcp/search_handler.py | 23 ++++++- src/brainlayer/search_profile.py | 46 +++++++++++++ src/brainlayer/search_repo.py | 39 +++++++++++ tests/test_search_profile.py | 54 ++++++++++++++++ 11 files changed, 455 insertions(+), 23 deletions(-) create mode 100644 brain-bar/Sources/BrainBar/SearchProfileLogger.swift create mode 100644 brain-bar/Sources/BrainBarDaemon/SearchProfileLogger.swift create mode 100644 docs.local/audits/2026-05-25-search-profile-instrumentation-mandate.md create mode 100644 src/brainlayer/search_profile.py create mode 100644 tests/test_search_profile.py diff --git a/brain-bar/Sources/BrainBar/HybridSearchHelperClient.swift b/brain-bar/Sources/BrainBar/HybridSearchHelperClient.swift index 770aee75..5915c3b2 100644 --- a/brain-bar/Sources/BrainBar/HybridSearchHelperClient.swift +++ b/brain-bar/Sources/BrainBar/HybridSearchHelperClient.swift @@ -134,16 +134,38 @@ final class HybridSearchHelperClient: HybridSearchClientProtocol, @unchecked Sen } func search(arguments: [String: Any]) throws -> HybridSearchResponse { - try queue.sync { - try startLocked() - do { - return try send(arguments: arguments) - } catch { - if Self.shouldRestartHelper(after: error) { - stopLocked() + let profileQueryID = (arguments["_profile_query_id"] as? String) + ?? (SearchProfileLogger.isEnabled ? SearchProfileLogger.newQueryID() : nil) + let profileStartedAt = SearchProfileLogger.now() + SearchProfileLogger.log(scope: "search.brainbar", step: "helper_rpc_start", queryID: profileQueryID) + do { + let response = try queue.sync { + try startLocked() + do { + return try send(arguments: arguments) + } catch { + if Self.shouldRestartHelper(after: error) { + stopLocked() + } + throw error } - throw error } + SearchProfileLogger.log( + scope: "search.brainbar", + step: "helper_rpc_done", + queryID: profileQueryID, + durMS: SearchProfileLogger.durationMS(since: profileStartedAt) + ) + return response + } catch { + SearchProfileLogger.log( + scope: "search.brainbar", + step: "helper_rpc_done", + queryID: profileQueryID, + durMS: SearchProfileLogger.durationMS(since: profileStartedAt), + fields: ["error": String(describing: error)] + ) + throw error } } diff --git a/brain-bar/Sources/BrainBar/MCPRouter.swift b/brain-bar/Sources/BrainBar/MCPRouter.swift index 914e0f06..66896ec0 100644 --- a/brain-bar/Sources/BrainBar/MCPRouter.swift +++ b/brain-bar/Sources/BrainBar/MCPRouter.swift @@ -236,6 +236,9 @@ final class MCPRouter: @unchecked Sendable { // MARK: - Tool Handlers private func handleBrainSearch(_ args: [String: Any]) throws -> ToolOutput { + let profileStartedAt = SearchProfileLogger.now() + let profileQueryID = (args["_profile_query_id"] as? String) + ?? (SearchProfileLogger.isEnabled ? SearchProfileLogger.newQueryID() : nil) guard let query = args["query"] as? String else { throw ToolError.missingParameter("query") } @@ -262,6 +265,12 @@ final class MCPRouter: @unchecked Sendable { guard let db = database else { throw ToolError.noDatabase } + SearchProfileLogger.log( + scope: "search.brainbar", + step: "router_dispatch", + queryID: profileQueryID, + durMS: SearchProfileLogger.durationMS(since: profileStartedAt) + ) func localKGSection() -> String { let hasActiveFilters = project != nil || sourceCountsAsFilter || tag != nil || subscriberID != nil || importanceMin != nil @@ -309,7 +318,8 @@ final class MCPRouter: @unchecked Sendable { source: source, tag: tag, importanceMin: importanceMin, - detail: args["detail"] as? String + detail: args["detail"] as? String, + profileQueryID: profileQueryID )) textSection = response.text metadata = sanitizedHybridMetadata(response.metadata) @@ -350,7 +360,8 @@ final class MCPRouter: @unchecked Sendable { source: String?, tag: String?, importanceMin: Double?, - detail: String? + detail: String?, + profileQueryID: String? ) -> [String: Any] { var arguments: [String: Any] = [ "query": query, @@ -367,6 +378,9 @@ final class MCPRouter: @unchecked Sendable { if let importanceMin { arguments["importance_min"] = importanceMin } + if let profileQueryID { + arguments["_profile_query_id"] = profileQueryID + } return arguments } diff --git a/brain-bar/Sources/BrainBar/QuickCapturePanel.swift b/brain-bar/Sources/BrainBar/QuickCapturePanel.swift index 4766c98a..af1a5a08 100644 --- a/brain-bar/Sources/BrainBar/QuickCapturePanel.swift +++ b/brain-bar/Sources/BrainBar/QuickCapturePanel.swift @@ -110,6 +110,8 @@ final class QuickCaptureViewModel: ObservableObject { private var copyResetTask: Task? private var feedbackResetTask: Task? private let searchDebounceDelay: Duration + private var lastSearchKeystrokeAt: TimeInterval? + private var lastSearchQueryID: String? var _pendingSearchTask: Task? /// Exposed for test awaiting — set when an async store is in flight. var _pendingStoreTask: Task? @@ -272,6 +274,10 @@ final class QuickCaptureViewModel: ObservableObject { _pendingSearchTask?.cancel() return } + if SearchProfileLogger.isEnabled { + lastSearchKeystrokeAt = SearchProfileLogger.now() + lastSearchQueryID = SearchProfileLogger.newQueryID() + } scheduleDebouncedSearch() } @@ -372,19 +378,42 @@ final class QuickCaptureViewModel: ObservableObject { private func scheduleDebouncedSearch() { let query = inputText + let profileKeystrokeAt = lastSearchKeystrokeAt + let profileQueryID = lastSearchQueryID _pendingSearchTask?.cancel() _pendingSearchTask = Task { @MainActor [weak self] in guard let self else { return } try? await Task.sleep(for: searchDebounceDelay) guard !Task.isCancelled, query == inputText else { return } - submitSearch(query: query, cancelPending: false) + let submitStartedAt = SearchProfileLogger.now() + if let profileKeystrokeAt { + SearchProfileLogger.log( + scope: "search.brainbar", + step: "keystroke_submit", + queryID: profileQueryID, + durMS: SearchProfileLogger.durationMS(since: profileKeystrokeAt) + ) + } + submitSearch( + query: query, + cancelPending: false, + profileQueryID: profileQueryID, + profileSubmitStartedAt: submitStartedAt + ) } } - private func submitSearch(query: String? = nil, cancelPending: Bool = true) { + private func submitSearch( + query: String? = nil, + cancelPending: Bool = true, + profileQueryID: String? = nil, + profileSubmitStartedAt: TimeInterval? = nil + ) { if cancelPending { _pendingSearchTask?.cancel() } + let profileQueryID = profileQueryID ?? (SearchProfileLogger.isEnabled ? SearchProfileLogger.newQueryID() : nil) + let submitStartedAt = profileSubmitStartedAt ?? SearchProfileLogger.now() let query = query ?? inputText do { let searchResult = try search(query, 8) @@ -392,11 +421,25 @@ final class QuickCaptureViewModel: ObservableObject { selectedResultIndex = results.isEmpty ? nil : 0 copiedResultID = nil feedback = .idle + SearchProfileLogger.log( + scope: "search.brainbar", + step: "render_done", + queryID: profileQueryID, + durMS: SearchProfileLogger.durationMS(since: submitStartedAt), + fields: ["result_count": results.count] + ) } catch { results = [] selectedResultIndex = nil copiedResultID = nil feedback = .error(error.localizedDescription) + SearchProfileLogger.log( + scope: "search.brainbar", + step: "render_done", + queryID: profileQueryID, + durMS: SearchProfileLogger.durationMS(since: submitStartedAt), + fields: ["error": String(describing: error)] + ) } } diff --git a/brain-bar/Sources/BrainBar/SearchProfileLogger.swift b/brain-bar/Sources/BrainBar/SearchProfileLogger.swift new file mode 100644 index 00000000..86df71b6 --- /dev/null +++ b/brain-bar/Sources/BrainBar/SearchProfileLogger.swift @@ -0,0 +1,64 @@ +import Foundation + +enum SearchProfileLogger { + static var isEnabled: Bool { + ProcessInfo.processInfo.environment["BRAINLAYER_SEARCH_PROFILE"] == "1" + } + + static func newQueryID() -> String { + "q-\(UUID().uuidString.replacingOccurrences(of: "-", with: "").prefix(12))" + } + + static func now() -> TimeInterval { + ProcessInfo.processInfo.systemUptime + } + + static func durationMS(since startedAt: TimeInterval) -> Double { + ((now() - startedAt) * 1000).rounded(toPlaces: 3) + } + + static func log( + scope: String, + step: String, + queryID: String?, + durMS: Double? = nil, + fields: [String: Any] = [:] + ) { + guard isEnabled else { return } + + var event: [String: Any] = [ + "ts": isoTimestamp(), + "scope": scope, + "step": step + ] + if let queryID { + event["query_id"] = queryID + } + if let durMS { + event["dur_ms"] = durMS + } + for (key, value) in fields { + event[key] = value + } + + guard JSONSerialization.isValidJSONObject(event), + let data = try? JSONSerialization.data(withJSONObject: event, options: [.sortedKeys]), + let line = String(data: data, encoding: .utf8) else { + return + } + NSLog("%@", line) + } + + private static func isoTimestamp() -> String { + let formatter = ISO8601DateFormatter() + formatter.formatOptions = [.withInternetDateTime, .withFractionalSeconds] + return formatter.string(from: Date()) + } +} + +private extension Double { + func rounded(toPlaces places: Int) -> Double { + let divisor = pow(10.0, Double(places)) + return (self * divisor).rounded() / divisor + } +} diff --git a/brain-bar/Sources/BrainBarDaemon/SearchProfileLogger.swift b/brain-bar/Sources/BrainBarDaemon/SearchProfileLogger.swift new file mode 100644 index 00000000..86df71b6 --- /dev/null +++ b/brain-bar/Sources/BrainBarDaemon/SearchProfileLogger.swift @@ -0,0 +1,64 @@ +import Foundation + +enum SearchProfileLogger { + static var isEnabled: Bool { + ProcessInfo.processInfo.environment["BRAINLAYER_SEARCH_PROFILE"] == "1" + } + + static func newQueryID() -> String { + "q-\(UUID().uuidString.replacingOccurrences(of: "-", with: "").prefix(12))" + } + + static func now() -> TimeInterval { + ProcessInfo.processInfo.systemUptime + } + + static func durationMS(since startedAt: TimeInterval) -> Double { + ((now() - startedAt) * 1000).rounded(toPlaces: 3) + } + + static func log( + scope: String, + step: String, + queryID: String?, + durMS: Double? = nil, + fields: [String: Any] = [:] + ) { + guard isEnabled else { return } + + var event: [String: Any] = [ + "ts": isoTimestamp(), + "scope": scope, + "step": step + ] + if let queryID { + event["query_id"] = queryID + } + if let durMS { + event["dur_ms"] = durMS + } + for (key, value) in fields { + event[key] = value + } + + guard JSONSerialization.isValidJSONObject(event), + let data = try? JSONSerialization.data(withJSONObject: event, options: [.sortedKeys]), + let line = String(data: data, encoding: .utf8) else { + return + } + NSLog("%@", line) + } + + private static func isoTimestamp() -> String { + let formatter = ISO8601DateFormatter() + formatter.formatOptions = [.withInternetDateTime, .withFractionalSeconds] + return formatter.string(from: Date()) + } +} + +private extension Double { + func rounded(toPlaces places: Int) -> Double { + let divisor = pow(10.0, Double(places)) + return (self * divisor).rounded() / divisor + } +} diff --git a/docs.local/audits/2026-05-25-search-profile-instrumentation-mandate.md b/docs.local/audits/2026-05-25-search-profile-instrumentation-mandate.md new file mode 100644 index 00000000..84500c27 --- /dev/null +++ b/docs.local/audits/2026-05-25-search-profile-instrumentation-mandate.md @@ -0,0 +1,34 @@ +# Phase 2.4-G Search Profile Instrumentation Mandate + +Status: instrumentation-only PR. + +Etan reported search latency around 15 seconds after Phase 2.4-F/PR #320 was expected to make warm hybrid search fast. This PR deliberately does not optimize the search path. It adds opt-in timing logs so the next pass can identify the slow segment from data instead of speculation. + +## Enablement + +Set `BRAINLAYER_SEARCH_PROFILE=1` to emit verbose timing events. With the flag unset, the instrumentation is silent. + +Each event is a single JSON object with: + +- `ts`: UTC ISO timestamp +- `scope`: `search.brainbar`, `search.helper`, `search.mcp`, or the supplied search scope +- `step`: measured step name +- `query_id`: best-effort correlation ID +- `dur_ms`: duration for completed timed steps, when applicable + +## Instrumented Points + +1. BrainBar command bar keystroke debounce to `submitSearch`: `step=keystroke_submit`. +2. BrainBar MCP router dispatch into `handleBrainSearch`: `step=router_dispatch`. +3. BrainBar helper RPC start and completion: `step=helper_rpc_start`, `step=helper_rpc_done`. +4. Python helper/MCP embedding call: `step=embed`. +5. Python hybrid search leg: `step=hybrid_search`. +6. Hybrid internals: `step=binary_knn`, `step=float_rerank`, and `step=fts_merge`. +7. Helper startup warm state: `step=startup_warm_state`, including `warm_called`, `binary_index_available`, and `binary_knn_mmap_size`. +8. BrainBar result rendering completion: `step=render_done`. + +## Reading Guidance + +Run a real query with the flag enabled and group by `query_id`. Compare `dur_ms` across `embed`, `binary_knn`, `float_rerank`, `fts_merge`, `hybrid_search`, `helper_rpc_done`, and `render_done`. The largest duration is the first candidate for the follow-up performance fix. + +Do not infer a performance fix from this PR alone. Use the captured production log lines first. diff --git a/src/brainlayer/brainbar_hybrid_helper.py b/src/brainlayer/brainbar_hybrid_helper.py index 1e84ae5c..b5a0d01c 100644 --- a/src/brainlayer/brainbar_hybrid_helper.py +++ b/src/brainlayer/brainbar_hybrid_helper.py @@ -20,6 +20,8 @@ from pathlib import Path from typing import Any +from . import search_profile + _ACCEPT_TIMEOUT_SECONDS = 0.25 _CONNECTION_TIMEOUT_SECONDS = 5.0 _WARMUP_RETRY_DELAYS_SECONDS = (0.05, 0.1) @@ -43,12 +45,21 @@ def __init__(self, socket_path: Path, db_path: Path): self.socket_path = socket_path self.db_path = db_path self._stopped = False + self._warm_called = False def warm(self) -> None: + self._warm_called = True os.environ["BRAINLAYER_DB"] = os.fspath(self.db_path) from brainlayer.mcp._shared import _get_embedding_model, _get_search_vector_store store = _get_search_vector_store() + search_profile.emit( + "search.helper", + "startup_warm_state", + warm_called=self._warm_called, + binary_index_available=bool(getattr(store, "_binary_index_available", False)), + binary_knn_mmap_size=self._store_mmap_size(store), + ) model = _get_embedding_model() warmup_query = "brainbar hybrid helper warmup" query_embedding = model.embed_query(warmup_query) @@ -161,19 +172,27 @@ def _handle_request(self, request: dict[str, Any]) -> dict[str, Any]: async def _search(self, arguments: dict[str, Any]) -> tuple[str, dict[str, Any] | None]: from brainlayer.mcp.search_handler import _brain_search + query_id = str(arguments.get("_profile_query_id") or "") or None + if search_profile.enabled() and query_id is None: + query_id = search_profile.new_query_id() source = arguments.get("source") if source is None or source == "": source = "all" - result = await _brain_search( - query=str(arguments.get("query") or ""), - project=arguments.get("project"), - source=source, - tag=arguments.get("tag"), - importance_min=arguments.get("importance_min"), - num_results=int(arguments.get("num_results") or 5), - detail=str(arguments.get("detail") or "compact"), - ) + search_kwargs = { + "query": str(arguments.get("query") or ""), + "project": arguments.get("project"), + "source": source, + "tag": arguments.get("tag"), + "importance_min": arguments.get("importance_min"), + "num_results": int(arguments.get("num_results") or 5), + "detail": str(arguments.get("detail") or "compact"), + } + if search_profile.enabled() or query_id is not None: + search_kwargs["profile_query_id"] = query_id + search_kwargs["profile_scope"] = "search.helper" + + result = await _brain_search(**search_kwargs) if isinstance(result, tuple): content, structured = result @@ -202,6 +221,20 @@ def _content_text(content: Any) -> str: return str(text) return str(content) + @staticmethod + def _store_mmap_size(store: Any) -> int | None: + try: + cursor = store._read_cursor() + row = cursor.execute("PRAGMA mmap_size").fetchone() + except Exception: + return None + if not row: + return None + try: + return int(row[0]) + except (TypeError, ValueError): + return None + def parse_args(argv: list[str] | None = None) -> argparse.Namespace: parser = argparse.ArgumentParser(description="BrainBar persistent hybrid search helper") diff --git a/src/brainlayer/mcp/search_handler.py b/src/brainlayer/mcp/search_handler.py index 62b17281..c7d30c90 100644 --- a/src/brainlayer/mcp/search_handler.py +++ b/src/brainlayer/mcp/search_handler.py @@ -11,7 +11,7 @@ import apsw from mcp.types import TextContent -from .. import telemetry +from .. import search_profile, telemetry from .._helpers import _escape_fts5_query, _is_sqlite_busy_error from ..chunk_origin import CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT, is_precompact_checkpoint_content from ..lexical_defense import _normalize_surface, load_lexical_defense_dictionary @@ -485,8 +485,13 @@ async def _brain_search( correction_category: str | None = None, include_checkpoints: bool = False, include_audit: bool = False, + profile_query_id: str | None = None, + profile_scope: str = "search.mcp", ): """Unified search dispatcher -- routes to the right internal handler.""" + if search_profile.enabled() and profile_query_id is None: + profile_query_id = search_profile.new_query_id() + profile_started = search_profile.now() if detail not in _VALID_SEARCH_DETAILS: return _error_result(f"Invalid detail='{detail}'. Must be one of: {sorted(_VALID_SEARCH_DETAILS)}") @@ -522,6 +527,8 @@ async def _brain_search( correction_category=correction_category, include_checkpoints=include_checkpoints, include_audit=include_audit, + profile_query_id=profile_query_id, + profile_scope=profile_scope, ) if chunk_id is not None: @@ -804,7 +811,7 @@ def _emit_kg_degrade(reason: str) -> None: query, ) - return await _search( + result = await _search( query=query, project=project, content_type=content_type, @@ -822,7 +829,11 @@ def _emit_kg_degrade(reason: str) -> None: correction_category=correction_category, include_checkpoints=include_checkpoints, include_audit=include_audit, + profile_query_id=profile_query_id, + profile_scope=profile_scope, ) + search_profile.emit(profile_scope, "brain_search", profile_query_id, search_profile.dur_ms(profile_started)) + return result def _escape_like_pattern(value: str) -> str: @@ -1141,6 +1152,8 @@ async def _search( correction_category: str | None = None, include_checkpoints: bool = False, include_audit: bool = False, + profile_query_id: str | None = None, + profile_scope: str = "search.mcp", ): """Execute a hybrid search query (semantic + keyword via RRF). Retries on BusyError.""" try: @@ -1165,7 +1178,9 @@ async def _search( normalized_project = _normalize_project_name(project) loop = asyncio.get_running_loop() model = _get_embedding_model() + embed_started = search_profile.now() query_embedding = await loop.run_in_executor(None, model.embed_query, query) + search_profile.emit(profile_scope, "embed", profile_query_id, search_profile.dur_ms(embed_started)) if source == "all": source_filter = None @@ -1182,6 +1197,7 @@ async def _search( # Retry hybrid_search on BusyError — WAL reads shouldn't block but # they can during checkpoint or when enrichment holds exclusive lock. results = None + hybrid_started = search_profile.now() for attempt in range(_RETRY_MAX_ATTEMPTS): try: results = store.hybrid_search( @@ -1203,6 +1219,8 @@ async def _search( correction_category=correction_category, include_checkpoints=include_checkpoints, include_audit=include_audit, + profile_query_id=profile_query_id, + profile_scope=profile_scope, ) break except Exception as e: @@ -1215,6 +1233,7 @@ async def _search( await asyncio.sleep(delay) continue raise # Non-lock error or retries exhausted + search_profile.emit(profile_scope, "hybrid_search", profile_query_id, search_profile.dur_ms(hybrid_started)) if not results["documents"][0]: empty = {"query": query, "total": 0, "results": []} diff --git a/src/brainlayer/search_profile.py b/src/brainlayer/search_profile.py new file mode 100644 index 00000000..7bef0877 --- /dev/null +++ b/src/brainlayer/search_profile.py @@ -0,0 +1,46 @@ +"""Opt-in search latency profile logging.""" + +from __future__ import annotations + +import json +import logging +import os +import time +import uuid +from datetime import datetime, timezone +from typing import Any + +logger = logging.getLogger(__name__) + + +def enabled() -> bool: + return os.environ.get("BRAINLAYER_SEARCH_PROFILE") == "1" + + +def new_query_id() -> str: + return f"q-{uuid.uuid4().hex[:12]}" + + +def now() -> float: + return time.perf_counter() + + +def dur_ms(started_at: float) -> float: + return round((time.perf_counter() - started_at) * 1000, 3) + + +def emit(scope: str, step: str, query_id: str | None = None, dur_ms: float | None = None, **fields: Any) -> None: + if not enabled(): + return + + event: dict[str, Any] = { + "ts": datetime.now(timezone.utc).isoformat(timespec="milliseconds").replace("+00:00", "Z"), + "scope": scope, + "step": step, + } + if query_id: + event["query_id"] = query_id + if dur_ms is not None: + event["dur_ms"] = dur_ms + event.update(fields) + logger.info(json.dumps(event, sort_keys=True, separators=(",", ":"))) diff --git a/src/brainlayer/search_repo.py b/src/brainlayer/search_repo.py index ecedcbdf..9ee72705 100644 --- a/src/brainlayer/search_repo.py +++ b/src/brainlayer/search_repo.py @@ -15,6 +15,7 @@ import apsw import numpy as np +from . import search_profile from ._helpers import _escape_fts5_query, _is_sqlite_busy_error, serialize_f32 from .chunk_origin import CHUNK_ORIGIN_PRECOMPACT_CHECKPOINT, is_precompact_checkpoint_content from .dedupe import resolve_chunk_id @@ -1239,6 +1240,8 @@ def hybrid_search( correction_category: Optional[str] = None, filter_meta_noise: bool = True, include_audit: bool = False, + profile_query_id: str | None = None, + profile_scope: str = "search.repo", ) -> Dict[str, List]: """Hybrid search combining semantic (vector) + keyword (FTS5) via Reciprocal Rank Fusion. @@ -1290,6 +1293,7 @@ def hybrid_search( # when the binary index is unavailable (for example readonly live DBs). candidate_fetch_count = max(n_results * 3, _MMR_CANDIDATE_LIMIT) if getattr(self, "_binary_index_available", False): + binary_started = search_profile.now() semantic = self._binary_search( query_embedding=query_embedding, n_results=candidate_fetch_count, @@ -1311,8 +1315,25 @@ def hybrid_search( correction_category=correction_category, include_audit=include_audit, ) + search_profile.emit( + profile_scope, + "binary_knn", + profile_query_id, + search_profile.dur_ms(binary_started), + binary_index_available=True, + candidate_count=len(semantic.get("ids", [[]])[0]), + ) + rerank_started = search_profile.now() semantic = self._rerank_binary_results_with_float(query_embedding, semantic) + search_profile.emit( + profile_scope, + "float_rerank", + profile_query_id, + search_profile.dur_ms(rerank_started), + candidate_count=len(semantic.get("ids", [[]])[0]), + ) else: + float_started = search_profile.now() semantic = self.search( query_embedding=query_embedding, n_results=candidate_fetch_count, @@ -1334,6 +1355,14 @@ def hybrid_search( correction_category=correction_category, include_audit=include_audit, ) + search_profile.emit( + profile_scope, + "float_search", + profile_query_id, + search_profile.dur_ms(float_started), + binary_index_available=False, + candidate_count=len(semantic.get("ids", [[]])[0]), + ) # Build semantic rank map: chunk_content -> rank semantic_ranks = {} @@ -1349,6 +1378,7 @@ def hybrid_search( fts_query = fts_query_override or _escape_fts5_query(query_text) fts_results = [] trigram_fts_results = [] + fts_started = search_profile.now() if fts_query: fts_extra = [] fts_filter_params: list = [] @@ -1434,6 +1464,15 @@ def _fetch_fts_rows(table_name: str) -> list[tuple]: fts_results = _fetch_fts_rows("chunks_fts") if getattr(self, "_trigram_fts_available", False): trigram_fts_results = _fetch_fts_rows("chunks_fts_trigram") + search_profile.emit( + profile_scope, + "fts_merge", + profile_query_id, + search_profile.dur_ms(fts_started), + fts_enabled=bool(fts_query), + fts_count=len(fts_results), + trigram_count=len(trigram_fts_results), + ) # Build FTS rank map fts_ranks = {} diff --git a/tests/test_search_profile.py b/tests/test_search_profile.py new file mode 100644 index 00000000..d43d240c --- /dev/null +++ b/tests/test_search_profile.py @@ -0,0 +1,54 @@ +import json +import logging + +import pytest + +from brainlayer.mcp.search_handler import _brain_search + + +class FakeEmbeddingModel: + def embed_query(self, _query): + return [0.1, 0.2, 0.3] + + +class FakeSearchStore: + def count(self): + return 1 + + def hybrid_search(self, **_kwargs): + return { + "ids": [["chunk-profile-1"]], + "documents": [["auth refactor profile result"]], + "metadatas": [[{"source_file": "test.md", "project": "brainlayer"}]], + "distances": [[0.25]], + } + + def enrich_results_with_session_context(self, results): + return results + + +@pytest.mark.asyncio +async def test_brain_search_profile_flag_emits_timing_json(monkeypatch, caplog): + monkeypatch.setenv("BRAINLAYER_SEARCH_PROFILE", "1") + monkeypatch.setattr("brainlayer.mcp.search_handler._get_vector_store", lambda: FakeSearchStore()) + monkeypatch.setattr("brainlayer.mcp.search_handler._get_embedding_model", lambda: FakeEmbeddingModel()) + monkeypatch.setattr("brainlayer.mcp.search_handler._expanded_fts_query", lambda *_args, **_kwargs: None) + monkeypatch.setattr("brainlayer.mcp.search_handler._detect_entities", lambda *_args, **_kwargs: []) + + caplog.set_level(logging.INFO) + + await _brain_search(query="auth refactor", project="brainlayer", source="all", detail="compact") + + profile_events = [] + for record in caplog.records: + try: + event = json.loads(record.getMessage()) + except json.JSONDecodeError: + continue + if str(event.get("scope", "")).startswith("search."): + profile_events.append(event) + + assert len(profile_events) >= 3 + assert {event["step"] for event in profile_events} >= {"brain_search", "embed", "hybrid_search"} + assert all("query_id" in event for event in profile_events) + assert all("ts" in event for event in profile_events) From 8c81107c476757483e852109c88394b77baf4fa7 Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Mon, 25 May 2026 22:05:37 +0300 Subject: [PATCH 2/3] fix(search): profile failed hybrid searches --- ...-search-profile-instrumentation-mandate.md | 1 + src/brainlayer/mcp/search_handler.py | 98 ++++++++++++------- tests/test_search_profile.py | 43 ++++++-- 3 files changed, 98 insertions(+), 44 deletions(-) diff --git a/docs.local/audits/2026-05-25-search-profile-instrumentation-mandate.md b/docs.local/audits/2026-05-25-search-profile-instrumentation-mandate.md index 84500c27..d49ea05b 100644 --- a/docs.local/audits/2026-05-25-search-profile-instrumentation-mandate.md +++ b/docs.local/audits/2026-05-25-search-profile-instrumentation-mandate.md @@ -15,6 +15,7 @@ Each event is a single JSON object with: - `step`: measured step name - `query_id`: best-effort correlation ID - `dur_ms`: duration for completed timed steps, when applicable +- `error`: exception class for failed timed steps, when applicable ## Instrumented Points diff --git a/src/brainlayer/mcp/search_handler.py b/src/brainlayer/mcp/search_handler.py index c7d30c90..129b5172 100644 --- a/src/brainlayer/mcp/search_handler.py +++ b/src/brainlayer/mcp/search_handler.py @@ -1179,7 +1179,17 @@ async def _search( loop = asyncio.get_running_loop() model = _get_embedding_model() embed_started = search_profile.now() - query_embedding = await loop.run_in_executor(None, model.embed_query, query) + try: + query_embedding = await loop.run_in_executor(None, model.embed_query, query) + except Exception as exc: + search_profile.emit( + profile_scope, + "embed", + profile_query_id, + search_profile.dur_ms(embed_started), + error=exc.__class__.__name__, + ) + raise search_profile.emit(profile_scope, "embed", profile_query_id, search_profile.dur_ms(embed_started)) if source == "all": @@ -1198,42 +1208,58 @@ async def _search( # they can during checkpoint or when enrichment holds exclusive lock. results = None hybrid_started = search_profile.now() - for attempt in range(_RETRY_MAX_ATTEMPTS): - try: - results = store.hybrid_search( - query_embedding=query_embedding, - query_text=query, - fts_query_override=fts_query_override, - n_results=num_results, - project_filter=normalized_project, - content_type_filter=content_type, - source_filter=source_filter, - tag_filter=tag, - intent_filter=intent, - importance_min=importance_min, - date_from=date_from, - date_to=date_to, - sentiment_filter=sentiment, - entity_id=entity_id, - source_filter_like=source_filter_like, - correction_category=correction_category, - include_checkpoints=include_checkpoints, - include_audit=include_audit, - profile_query_id=profile_query_id, - profile_scope=profile_scope, - ) - break - except Exception as e: - is_lock = isinstance(e, apsw.BusyError) or "locked" in str(e).lower() or "busy" in str(e).lower() - if is_lock and attempt < _RETRY_MAX_ATTEMPTS - 1: - delay = _retry_delay * (2**attempt) - logger.warning( - "Search BusyError (attempt %d/%d), retrying in %.2fs", attempt + 1, _RETRY_MAX_ATTEMPTS, delay + hybrid_error: str | None = None + try: + for attempt in range(_RETRY_MAX_ATTEMPTS): + try: + results = store.hybrid_search( + query_embedding=query_embedding, + query_text=query, + fts_query_override=fts_query_override, + n_results=num_results, + project_filter=normalized_project, + content_type_filter=content_type, + source_filter=source_filter, + tag_filter=tag, + intent_filter=intent, + importance_min=importance_min, + date_from=date_from, + date_to=date_to, + sentiment_filter=sentiment, + entity_id=entity_id, + source_filter_like=source_filter_like, + correction_category=correction_category, + include_checkpoints=include_checkpoints, + include_audit=include_audit, + profile_query_id=profile_query_id, + profile_scope=profile_scope, ) - await asyncio.sleep(delay) - continue - raise # Non-lock error or retries exhausted - search_profile.emit(profile_scope, "hybrid_search", profile_query_id, search_profile.dur_ms(hybrid_started)) + break + except Exception as e: + is_lock = isinstance(e, apsw.BusyError) or "locked" in str(e).lower() or "busy" in str(e).lower() + if is_lock and attempt < _RETRY_MAX_ATTEMPTS - 1: + delay = _retry_delay * (2**attempt) + logger.warning( + "Search BusyError (attempt %d/%d), retrying in %.2fs", + attempt + 1, + _RETRY_MAX_ATTEMPTS, + delay, + ) + await asyncio.sleep(delay) + continue + raise # Non-lock error or retries exhausted + except Exception as exc: + hybrid_error = exc.__class__.__name__ + raise + finally: + fields = {"error": hybrid_error} if hybrid_error else {} + search_profile.emit( + profile_scope, + "hybrid_search", + profile_query_id, + search_profile.dur_ms(hybrid_started), + **fields, + ) if not results["documents"][0]: empty = {"query": query, "total": 0, "results": []} diff --git a/tests/test_search_profile.py b/tests/test_search_profile.py index d43d240c..14e8d81d 100644 --- a/tests/test_search_profile.py +++ b/tests/test_search_profile.py @@ -27,6 +27,23 @@ def enrich_results_with_session_context(self, results): return results +class FakeFailingSearchStore(FakeSearchStore): + def hybrid_search(self, **_kwargs): + raise RuntimeError("profile failure") + + +def _profile_events(caplog): + events = [] + for record in caplog.records: + try: + event = json.loads(record.getMessage()) + except json.JSONDecodeError: + continue + if str(event.get("scope", "")).startswith("search."): + events.append(event) + return events + + @pytest.mark.asyncio async def test_brain_search_profile_flag_emits_timing_json(monkeypatch, caplog): monkeypatch.setenv("BRAINLAYER_SEARCH_PROFILE", "1") @@ -39,16 +56,26 @@ async def test_brain_search_profile_flag_emits_timing_json(monkeypatch, caplog): await _brain_search(query="auth refactor", project="brainlayer", source="all", detail="compact") - profile_events = [] - for record in caplog.records: - try: - event = json.loads(record.getMessage()) - except json.JSONDecodeError: - continue - if str(event.get("scope", "")).startswith("search."): - profile_events.append(event) + profile_events = _profile_events(caplog) assert len(profile_events) >= 3 assert {event["step"] for event in profile_events} >= {"brain_search", "embed", "hybrid_search"} assert all("query_id" in event for event in profile_events) assert all("ts" in event for event in profile_events) + + +@pytest.mark.asyncio +async def test_brain_search_profile_flag_emits_failed_hybrid_timing(monkeypatch, caplog): + monkeypatch.setenv("BRAINLAYER_SEARCH_PROFILE", "1") + monkeypatch.setattr("brainlayer.mcp.search_handler._get_vector_store", lambda: FakeFailingSearchStore()) + monkeypatch.setattr("brainlayer.mcp.search_handler._get_embedding_model", lambda: FakeEmbeddingModel()) + monkeypatch.setattr("brainlayer.mcp.search_handler._expanded_fts_query", lambda *_args, **_kwargs: None) + monkeypatch.setattr("brainlayer.mcp.search_handler._detect_entities", lambda *_args, **_kwargs: []) + + caplog.set_level(logging.INFO) + + await _brain_search(query="auth refactor", project="brainlayer", source="all", detail="compact") + + hybrid_events = [event for event in _profile_events(caplog) if event.get("step") == "hybrid_search"] + assert len(hybrid_events) == 1 + assert hybrid_events[0]["error"] == "RuntimeError" From cd7ab401479eb07dfb515ff31d3ee2c89ba01cf7 Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Mon, 25 May 2026 22:16:24 +0300 Subject: [PATCH 3/3] fix(search): close profile trace gaps --- .../BrainBar/SearchProfileLogger.swift | 3 +- .../BrainBarDaemon/SearchProfileLogger.swift | 3 +- src/brainlayer/mcp/search_handler.py | 59 +++++++++++++- src/brainlayer/search_profile.py | 20 ++++- tests/test_search_profile.py | 77 +++++++++++++++++++ 5 files changed, 157 insertions(+), 5 deletions(-) diff --git a/brain-bar/Sources/BrainBar/SearchProfileLogger.swift b/brain-bar/Sources/BrainBar/SearchProfileLogger.swift index 86df71b6..4f8af1fa 100644 --- a/brain-bar/Sources/BrainBar/SearchProfileLogger.swift +++ b/brain-bar/Sources/BrainBar/SearchProfileLogger.swift @@ -37,7 +37,8 @@ enum SearchProfileLogger { if let durMS { event["dur_ms"] = durMS } - for (key, value) in fields { + let reservedKeys: Set = ["ts", "scope", "step", "query_id", "dur_ms"] + for (key, value) in fields where !reservedKeys.contains(key) { event[key] = value } diff --git a/brain-bar/Sources/BrainBarDaemon/SearchProfileLogger.swift b/brain-bar/Sources/BrainBarDaemon/SearchProfileLogger.swift index 86df71b6..4f8af1fa 100644 --- a/brain-bar/Sources/BrainBarDaemon/SearchProfileLogger.swift +++ b/brain-bar/Sources/BrainBarDaemon/SearchProfileLogger.swift @@ -37,7 +37,8 @@ enum SearchProfileLogger { if let durMS { event["dur_ms"] = durMS } - for (key, value) in fields { + let reservedKeys: Set = ["ts", "scope", "step", "query_id", "dur_ms"] + for (key, value) in fields where !reservedKeys.contains(key) { event[key] = value } diff --git a/src/brainlayer/mcp/search_handler.py b/src/brainlayer/mcp/search_handler.py index 129b5172..d3959490 100644 --- a/src/brainlayer/mcp/search_handler.py +++ b/src/brainlayer/mcp/search_handler.py @@ -492,7 +492,63 @@ async def _brain_search( if search_profile.enabled() and profile_query_id is None: profile_query_id = search_profile.new_query_id() profile_started = search_profile.now() + try: + return await _brain_search_dispatch( + query=query, + project=project, + file_path=file_path, + chunk_id=chunk_id, + content_type=content_type, + source=source, + tag=tag, + intent=intent, + importance_min=importance_min, + date_from=date_from, + date_to=date_to, + sentiment=sentiment, + entity_id=entity_id, + num_results=num_results, + before=before, + after=after, + max_results=max_results, + detail=detail, + source_filter=source_filter, + correction_category=correction_category, + include_checkpoints=include_checkpoints, + include_audit=include_audit, + profile_query_id=profile_query_id, + profile_scope=profile_scope, + ) + finally: + search_profile.emit(profile_scope, "brain_search", profile_query_id, search_profile.dur_ms(profile_started)) + +async def _brain_search_dispatch( + query: str, + project: str | None = None, + file_path: str | None = None, + chunk_id: str | None = None, + content_type: str | None = None, + source: str | None = None, + tag: str | None = None, + intent: str | None = None, + importance_min: float | None = None, + date_from: str | None = None, + date_to: str | None = None, + sentiment: str | None = None, + entity_id: str | None = None, + num_results: int = 5, + before: int = 3, + after: int = 3, + max_results: int = 10, + detail: str = "compact", + source_filter: str | None = None, + correction_category: str | None = None, + include_checkpoints: bool = False, + include_audit: bool = False, + profile_query_id: str | None = None, + profile_scope: str = "search.mcp", +): if detail not in _VALID_SEARCH_DETAILS: return _error_result(f"Invalid detail='{detail}'. Must be one of: {sorted(_VALID_SEARCH_DETAILS)}") if num_results < _MIN_PUBLIC_NUM_RESULTS or num_results > _MAX_PUBLIC_NUM_RESULTS: @@ -614,6 +670,8 @@ async def _brain_search( correction_category=correction_category, include_checkpoints=include_checkpoints, include_audit=include_audit, + profile_query_id=profile_query_id, + profile_scope=profile_scope, ) if _query_signals_current_context(query): @@ -832,7 +890,6 @@ def _emit_kg_degrade(reason: str) -> None: profile_query_id=profile_query_id, profile_scope=profile_scope, ) - search_profile.emit(profile_scope, "brain_search", profile_query_id, search_profile.dur_ms(profile_started)) return result diff --git a/src/brainlayer/search_profile.py b/src/brainlayer/search_profile.py index 7bef0877..ebbcd4aa 100644 --- a/src/brainlayer/search_profile.py +++ b/src/brainlayer/search_profile.py @@ -11,6 +11,7 @@ from typing import Any logger = logging.getLogger(__name__) +_RESERVED_FIELDS = {"ts", "scope", "step", "query_id", "dur_ms"} def enabled() -> bool: @@ -42,5 +43,20 @@ def emit(scope: str, step: str, query_id: str | None = None, dur_ms: float | Non event["query_id"] = query_id if dur_ms is not None: event["dur_ms"] = dur_ms - event.update(fields) - logger.info(json.dumps(event, sort_keys=True, separators=(",", ":"))) + for key, value in fields.items(): + if key not in _RESERVED_FIELDS: + event[key] = value + try: + payload = json.dumps(event, sort_keys=True, separators=(",", ":")) + except TypeError: + safe_event = {key: value if _is_json_safe(value) else repr(value) for key, value in event.items()} + payload = json.dumps(safe_event, sort_keys=True, separators=(",", ":")) + logger.info(payload) + + +def _is_json_safe(value: Any) -> bool: + try: + json.dumps(value) + except TypeError: + return False + return True diff --git a/tests/test_search_profile.py b/tests/test_search_profile.py index 14e8d81d..80f41d49 100644 --- a/tests/test_search_profile.py +++ b/tests/test_search_profile.py @@ -3,6 +3,7 @@ import pytest +from brainlayer import search_profile from brainlayer.mcp.search_handler import _brain_search @@ -79,3 +80,79 @@ async def test_brain_search_profile_flag_emits_failed_hybrid_timing(monkeypatch, hybrid_events = [event for event in _profile_events(caplog) if event.get("step") == "hybrid_search"] assert len(hybrid_events) == 1 assert hybrid_events[0]["error"] == "RuntimeError" + + +@pytest.mark.asyncio +async def test_brain_search_profile_flag_emits_for_file_path_return(monkeypatch, caplog): + async def fake_file_timeline(**_kwargs): + return [] + + async def fake_recall(**_kwargs): + return ([], {}) + + monkeypatch.setenv("BRAINLAYER_SEARCH_PROFILE", "1") + monkeypatch.setattr("brainlayer.mcp.search_handler._file_timeline", fake_file_timeline) + monkeypatch.setattr("brainlayer.mcp.search_handler._recall", fake_recall) + + caplog.set_level(logging.INFO) + + await _brain_search(query="notes for auth refactor", file_path="auth.md", project="brainlayer") + + brain_search_events = [event for event in _profile_events(caplog) if event.get("step") == "brain_search"] + assert len(brain_search_events) == 1 + assert brain_search_events[0]["scope"] == "search.mcp" + + +@pytest.mark.asyncio +async def test_brain_search_profile_keeps_query_id_across_file_path_recursion(monkeypatch, caplog): + async def fake_file_timeline(**_kwargs): + return [] + + async def fake_recall(**_kwargs): + return ([], {}) + + monkeypatch.setenv("BRAINLAYER_SEARCH_PROFILE", "1") + monkeypatch.setattr("brainlayer.mcp.search_handler._extract_file_path", lambda _query: "auth.md") + monkeypatch.setattr("brainlayer.mcp.search_handler._file_timeline", fake_file_timeline) + monkeypatch.setattr("brainlayer.mcp.search_handler._recall", fake_recall) + + caplog.set_level(logging.INFO) + + await _brain_search(query="show auth.md", project="brainlayer") + + brain_search_events = [event for event in _profile_events(caplog) if event.get("step") == "brain_search"] + assert len(brain_search_events) == 2 + assert len({event["query_id"] for event in brain_search_events}) == 1 + + +def test_search_profile_emit_preserves_reserved_keys(monkeypatch, caplog): + monkeypatch.setenv("BRAINLAYER_SEARCH_PROFILE", "1") + caplog.set_level(logging.INFO, logger="brainlayer.search_profile") + + search_profile.emit( + "search.mcp", + "brain_search", + "q-good", + 12.3, + ts="bad", + rows=1, + ) + + events = _profile_events(caplog) + assert len(events) == 1 + assert events[0]["scope"] == "search.mcp" + assert events[0]["step"] == "brain_search" + assert events[0]["query_id"] == "q-good" + assert events[0]["dur_ms"] == 12.3 + assert events[0]["rows"] == 1 + + +def test_search_profile_emit_stringifies_non_json_fields(monkeypatch, caplog): + monkeypatch.setenv("BRAINLAYER_SEARCH_PROFILE", "1") + caplog.set_level(logging.INFO, logger="brainlayer.search_profile") + + search_profile.emit("search.mcp", "brain_search", details={"values": {1, 2}}) + + events = _profile_events(caplog) + assert len(events) == 1 + assert events[0]["details"] == "{'values': {1, 2}}"