From bd4f7e7369f9181515f32884dbe255e65e76c017 Mon Sep 17 00:00:00 2001 From: Etan Joseph Heyman Date: Sat, 18 Apr 2026 20:54:09 +0300 Subject: [PATCH] feat(ingest): checkpoint multi-agent ingestion wiring --- .../launchd/com.brainlayer.agent-ingest.plist | 44 +++ scripts/launchd/install.sh | 8 +- src/brainlayer/agent_watch.py | 139 +++++++++ src/brainlayer/cli/__init__.py | 176 +++++++++++ src/brainlayer/index_new.py | 33 ++- src/brainlayer/ingest/cursor.py | 275 ++++++++++++++++++ src/brainlayer/ingest/gemini.py | 249 ++++++++++++++++ tests/test_agent_ingest_launchd.py | 23 ++ tests/test_agent_session_watcher.py | 72 +++++ tests/test_ingest_cursor.py | 92 ++++++ tests/test_ingest_gemini.py | 78 +++++ 11 files changed, 1172 insertions(+), 17 deletions(-) create mode 100644 scripts/launchd/com.brainlayer.agent-ingest.plist create mode 100644 src/brainlayer/agent_watch.py create mode 100644 src/brainlayer/ingest/cursor.py create mode 100644 src/brainlayer/ingest/gemini.py create mode 100644 tests/test_agent_ingest_launchd.py create mode 100644 tests/test_agent_session_watcher.py create mode 100644 tests/test_ingest_cursor.py create mode 100644 tests/test_ingest_gemini.py diff --git a/scripts/launchd/com.brainlayer.agent-ingest.plist b/scripts/launchd/com.brainlayer.agent-ingest.plist new file mode 100644 index 00000000..d857355b --- /dev/null +++ b/scripts/launchd/com.brainlayer.agent-ingest.plist @@ -0,0 +1,44 @@ + + + + + Label + com.brainlayer.agent-ingest + + ProgramArguments + + __BRAINLAYER_BIN__ + watch-agents + --poll + 30.0 + + + StandardOutPath + __HOME__/.local/share/brainlayer/logs/agent-ingest.log + StandardErrorPath + __HOME__/.local/share/brainlayer/logs/agent-ingest.err + + EnvironmentVariables + + PATH + /usr/local/bin:/usr/bin:/bin:__HOME__/.local/bin + PYTHONUNBUFFERED + 1 + + + KeepAlive + + + RunAtLoad + + + ThrottleInterval + 5 + + Nice + 10 + + ProcessType + Background + + diff --git a/scripts/launchd/install.sh b/scripts/launchd/install.sh index 205bcf4a..2f590fd6 100755 --- a/scripts/launchd/install.sh +++ b/scripts/launchd/install.sh @@ -6,6 +6,7 @@ # ./scripts/launchd/install.sh index # Install indexing only # ./scripts/launchd/install.sh enrich # Install enrichment only # ./scripts/launchd/install.sh decay # Install decay only +# ./scripts/launchd/install.sh agent-ingest # Install multi-agent ingest watcher # ./scripts/launchd/install.sh load enrichment # ./scripts/launchd/install.sh unload enrichment # ./scripts/launchd/install.sh checkpoint # Install WAL checkpoint only @@ -118,6 +119,9 @@ case "${1:-all}" in decay) install_plist decay ;; + agent-ingest) + install_plist agent-ingest + ;; load) load_plist "${2:-enrichment}" ;; @@ -129,6 +133,7 @@ case "${1:-all}" in ;; all) install_plist index + install_plist agent-ingest install_plist enrichment install_plist decay install_plist wal-checkpoint @@ -137,13 +142,14 @@ case "${1:-all}" in ;; remove) remove_plist index + remove_plist agent-ingest 2>/dev/null || true remove_plist enrich 2>/dev/null || true remove_plist enrichment 2>/dev/null || true remove_plist decay 2>/dev/null || true remove_plist wal-checkpoint ;; *) - echo "Usage: $0 [index|enrich|enrichment|decay|load [name]|unload [name]|checkpoint|all|remove]" + echo "Usage: $0 [index|agent-ingest|enrich|enrichment|decay|load [name]|unload [name]|checkpoint|all|remove]" exit 1 ;; esac diff --git a/src/brainlayer/agent_watch.py b/src/brainlayer/agent_watch.py new file mode 100644 index 00000000..914bcd14 --- /dev/null +++ b/src/brainlayer/agent_watch.py @@ -0,0 +1,139 @@ +"""Polling watcher for Codex, Cursor, and Gemini session artifacts.""" + +from __future__ import annotations + +import json +import logging +import os +import tempfile +import threading +import time +from dataclasses import dataclass +from pathlib import Path +from typing import Callable + +logger = logging.getLogger(__name__) + + +class AgentSessionRegistry: + """Persists mtime/size state for agent session files.""" + + def __init__(self, path: str | Path): + self.path = Path(path) + self._data: dict[str, dict[str, int]] = {} + self._dirty = False + self._load() + + def _load(self) -> None: + try: + with open(self.path) as fh: + data = json.load(fh) + if isinstance(data, dict): + self._data = data + except (OSError, json.JSONDecodeError, ValueError): + self._data = {} + + def get(self, filepath: str) -> dict[str, int] | None: + return self._data.get(filepath) + + def set(self, filepath: str, *, mtime_ns: int, size: int) -> None: + self._data[filepath] = {"mtime_ns": mtime_ns, "size": size} + self._dirty = True + + def flush(self) -> bool: + if not self._dirty: + return True + + tmp_path = None + try: + self.path.parent.mkdir(parents=True, exist_ok=True) + fd, tmp_path = tempfile.mkstemp(dir=str(self.path.parent), suffix=".tmp") + with os.fdopen(fd, "w") as fh: + json.dump(self._data, fh) + os.rename(tmp_path, self.path) + self._dirty = False + return True + except OSError as exc: + logger.warning("Failed to flush agent registry: %s", exc) + if tmp_path: + try: + os.unlink(tmp_path) + except OSError: + pass + return False + + +@dataclass(frozen=True) +class AgentSessionSource: + name: str + patterns: list[str] + ingest: Callable[[Path], int] + root: Path + + +class AgentSessionWatcher: + """Poll source roots and ingest files whose size/mtime changed.""" + + def __init__( + self, + registry_path: str | Path, + sources: list[AgentSessionSource], + poll_interval_s: float = 30.0, + registry_flush_interval_s: float = 5.0, + ): + self.registry = AgentSessionRegistry(registry_path) + self.sources = sources + self.poll_interval_s = poll_interval_s + self.registry_flush_interval_s = registry_flush_interval_s + self._stop = threading.Event() + self._last_registry_flush = time.monotonic() + + def _discover_files(self, source: AgentSessionSource) -> list[Path]: + files: list[Path] = [] + if not source.root.exists(): + return files + for pattern in source.patterns: + try: + files.extend(path for path in source.root.glob(pattern) if path.is_file()) + except OSError: + continue + return sorted(set(files)) + + def poll_once(self) -> int: + processed = 0 + for source in self.sources: + for file_path in self._discover_files(source): + try: + stat = file_path.stat() + except OSError: + continue + + state = {"mtime_ns": stat.st_mtime_ns, "size": stat.st_size} + previous = self.registry.get(str(file_path)) + if previous == state: + continue + + indexed = source.ingest(file_path) + logger.info("Agent ingest %s %s -> %d chunks", source.name, file_path.name, indexed) + self.registry.set(str(file_path), **state) + processed += 1 + + now = time.monotonic() + if now - self._last_registry_flush >= self.registry_flush_interval_s: + self.registry.flush() + self._last_registry_flush = now + return processed + + def start(self) -> None: + while not self._stop.is_set(): + try: + self.poll_once() + except Exception as exc: + logger.error("Agent watcher poll failed: %s", exc) + self._stop.wait(self.poll_interval_s) + + self.registry.flush() + + def stop(self) -> None: + self._stop.set() + diff --git a/src/brainlayer/cli/__init__.py b/src/brainlayer/cli/__init__.py index 198a460d..5cda4cd5 100644 --- a/src/brainlayer/cli/__init__.py +++ b/src/brainlayer/cli/__init__.py @@ -1875,6 +1875,182 @@ def ingest_codex( raise typer.Exit(1) +@app.command("ingest-cursor") +def ingest_cursor( + path: Optional[Path] = typer.Argument( + None, + help="Cursor transcript JSONL file or projects directory (default: ~/.cursor/projects)", + ), + project: str = typer.Option(None, "--project", "-p", help="Override project name"), + since_days: int = typer.Option(None, "--since-days", "-d", help="Only process last N days"), + dry_run: bool = typer.Option(False, "--dry-run", help="Parse but do not write to DB"), + verbose: bool = typer.Option(False, "--verbose", "-v", help="Print each classified entry"), +) -> None: + """Ingest Cursor agent transcripts into BrainLayer.""" + from ..ingest.cursor import ingest_cursor_dir, ingest_cursor_session + from ..paths import DEFAULT_DB_PATH + + db_path = DEFAULT_DB_PATH + + try: + if path and path.is_file(): + rprint(f"[bold blue]זיכרון[/] — Ingesting Cursor session: [bold]{path.name}[/]") + with console.status("Indexing..."): + n = ingest_cursor_session( + path, + db_path=db_path, + project_override=project, + dry_run=dry_run, + verbose=verbose, + ) + rprint(f"[bold green]✓[/] Indexed [bold]{n}[/] chunks from {path.name}") + else: + sessions_root = path if path else None + label = str(sessions_root) if sessions_root else "~/.cursor/projects" + rprint(f"[bold blue]זיכרון[/] — Ingesting Cursor sessions from: [bold]{label}[/]") + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + TimeElapsedColumn(), + console=console, + ) as progress: + task = progress.add_task("Scanning sessions...", total=None) + files, chunks = ingest_cursor_dir( + sessions_dir=sessions_root, + db_path=db_path, + project_override=project, + since_days=since_days, + dry_run=dry_run, + verbose=verbose, + ) + progress.update(task, description=f"Done — {files} files, {chunks:,} chunks") + tag = " [dim](dry run)[/]" if dry_run else "" + rprint(f"[bold green]✓[/] Processed [bold]{files}[/] session files, [bold]{chunks:,}[/] chunks{tag}") + except FileNotFoundError as e: + rprint(f"[bold red]Error:[/] {e}") + raise typer.Exit(1) + except Exception as e: + rprint(f"[bold red]Error:[/] {e}") + raise typer.Exit(1) + + +@app.command("ingest-gemini") +def ingest_gemini( + path: Optional[Path] = typer.Argument( + None, + help="Gemini session JSON file or root directory (default: ~/.gemini/tmp)", + ), + project: str = typer.Option(None, "--project", "-p", help="Override project name"), + since_days: int = typer.Option(None, "--since-days", "-d", help="Only process last N days"), + dry_run: bool = typer.Option(False, "--dry-run", help="Parse but do not write to DB"), + verbose: bool = typer.Option(False, "--verbose", "-v", help="Print each classified entry"), +) -> None: + """Ingest Gemini session transcripts into BrainLayer.""" + from ..ingest.gemini import ingest_gemini_dir, ingest_gemini_session + from ..paths import DEFAULT_DB_PATH + + db_path = DEFAULT_DB_PATH + + try: + if path and path.is_file(): + rprint(f"[bold blue]זיכרון[/] — Ingesting Gemini session: [bold]{path.name}[/]") + with console.status("Indexing..."): + n = ingest_gemini_session( + path, + db_path=db_path, + project_override=project, + dry_run=dry_run, + verbose=verbose, + ) + rprint(f"[bold green]✓[/] Indexed [bold]{n}[/] chunks from {path.name}") + else: + sessions_root = path if path else None + label = str(sessions_root) if sessions_root else "~/.gemini/tmp" + rprint(f"[bold blue]זיכרון[/] — Ingesting Gemini sessions from: [bold]{label}[/]") + with Progress( + SpinnerColumn(), + TextColumn("[progress.description]{task.description}"), + TimeElapsedColumn(), + console=console, + ) as progress: + task = progress.add_task("Scanning sessions...", total=None) + files, chunks = ingest_gemini_dir( + sessions_dir=sessions_root, + db_path=db_path, + project_override=project, + since_days=since_days, + dry_run=dry_run, + verbose=verbose, + ) + progress.update(task, description=f"Done — {files} files, {chunks:,} chunks") + tag = " [dim](dry run)[/]" if dry_run else "" + rprint(f"[bold green]✓[/] Processed [bold]{files}[/] session files, [bold]{chunks:,}[/] chunks{tag}") + except FileNotFoundError as e: + rprint(f"[bold red]Error:[/] {e}") + raise typer.Exit(1) + except Exception as e: + rprint(f"[bold red]Error:[/] {e}") + raise typer.Exit(1) + + +@app.command("watch-agents") +def watch_agents( + poll_interval: float = typer.Option(30.0, "--poll", help="Poll interval in seconds"), +) -> None: + """Watch Codex, Cursor, and Gemini session roots and ingest changed files.""" + import signal + + from ..agent_watch import AgentSessionSource, AgentSessionWatcher + from ..ingest.codex import ingest_codex_session + from ..ingest.cursor import ingest_cursor_session + from ..ingest.gemini import ingest_gemini_session + from ..paths import get_db_path + + db_path = get_db_path() + registry_path = db_path.parent / "agent-session-registry.json" + + watcher = AgentSessionWatcher( + registry_path=registry_path, + poll_interval_s=poll_interval, + sources=[ + AgentSessionSource( + name="codex_cli", + patterns=["**/*.jsonl"], + ingest=lambda path: ingest_codex_session(path, db_path=db_path), + root=Path.home() / ".codex" / "sessions", + ), + AgentSessionSource( + name="cursor", + patterns=["**/agent-transcripts/**/*.jsonl"], + ingest=lambda path: ingest_cursor_session(path, db_path=db_path), + root=Path.home() / ".cursor" / "projects", + ), + AgentSessionSource( + name="gemini", + patterns=["**/chats/session-*.json"], + ingest=lambda path: ingest_gemini_session(path, db_path=db_path), + root=Path.home() / ".gemini" / "tmp", + ), + ], + ) + + rprint("[bold blue]זיכרון[/] Multi-agent session watcher") + rprint(f" Registry: [bold]{registry_path}[/]") + rprint(f" Poll: {poll_interval}s") + rprint(" Sources: codex_cli, cursor, gemini") + rprint() + + def handle_signal(signum, frame): + rprint("\n[bold yellow]Stopping multi-agent watcher...[/]") + watcher.stop() + + signal.signal(signal.SIGTERM, handle_signal) + signal.signal(signal.SIGINT, handle_signal) + + watcher.start() + rprint("[bold green]Done.[/]") + + @app.command("analyze-semantic") def analyze_semantic( whatsapp_limit: int = typer.Option(5000, "--whatsapp-limit", "-w", help="Number of WhatsApp messages to analyze"), diff --git a/src/brainlayer/index_new.py b/src/brainlayer/index_new.py index 80521273..6b9866ab 100644 --- a/src/brainlayer/index_new.py +++ b/src/brainlayer/index_new.py @@ -20,6 +20,7 @@ def index_chunks_to_sqlite( project: Optional[str] = None, db_path: Path = DEFAULT_DB_PATH, on_progress: Optional[Callable[[int, int], None]] = None, + created_at: Optional[str] = None, ) -> int: """Index chunks to sqlite-vec database.""" if not chunks: @@ -44,22 +45,22 @@ def index_chunks_to_sqlite( if not embedded_chunks: return 0 - # Try to get timestamp from source file (first JSONL message) - created_at = None - try: - import json as _json - - with open(source_file) as _f: - for _line in _f: - _line = _line.strip() - if not _line: - continue - _data = _json.loads(_line) - if "timestamp" in _data: - created_at = _data["timestamp"] - break - except Exception as e: - logger.debug("Could not extract timestamp from %s: %s", source_file, e) + if not created_at: + # Try to get timestamp from source file (first JSONL message) + try: + import json as _json + + with open(source_file) as _f: + for _line in _f: + _line = _line.strip() + if not _line: + continue + _data = _json.loads(_line) + if "timestamp" in _data: + created_at = _data["timestamp"] + break + except Exception as e: + logger.debug("Could not extract timestamp from %s: %s", source_file, e) if not created_at: from datetime import datetime, timezone diff --git a/src/brainlayer/ingest/cursor.py b/src/brainlayer/ingest/cursor.py new file mode 100644 index 00000000..acd9cd16 --- /dev/null +++ b/src/brainlayer/ingest/cursor.py @@ -0,0 +1,275 @@ +"""Cursor session ingestion adapter for BrainLayer.""" + +from __future__ import annotations + +import json +import logging +import re +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterator, List, Optional + +logger = logging.getLogger(__name__) + +_MIN_USER_LEN = 15 +_MIN_ASSISTANT_LEN = 30 + +_USER_QUERY_RE = re.compile(r"^\s*\s*(.*?)\s*\s*$", re.DOTALL) + + +def _extract_text(content) -> str: + """Extract text content from Cursor message blocks.""" + if isinstance(content, str): + return content + if isinstance(content, dict): + return str(content.get("text", "")) + if isinstance(content, list): + parts = [] + for block in content: + if isinstance(block, dict) and block.get("type") == "text": + parts.append(block.get("text", "")) + elif isinstance(block, str): + parts.append(block) + return "\n".join(part for part in parts if part) + return "" + + +def _normalize_project_name(raw: str) -> str: + """Convert encoded path-like project names to the repo leaf.""" + if not raw: + return raw + + for marker in ("-Gits-", "-Desktop-", "-projects-", "-config-"): + if marker in raw: + return raw.split(marker)[-1] + + return raw + + +def _extract_project_from_path(file_path: Path) -> Optional[str]: + parts = file_path.parts + if "projects" in parts: + idx = parts.index("projects") + 1 + if idx < len(parts): + return _normalize_project_name(parts[idx]) + return None + + +def _file_mtime_iso(file_path: Path) -> str: + return datetime.fromtimestamp(file_path.stat().st_mtime, timezone.utc).isoformat() + + +def _clean_user_text(text: str) -> str: + stripped = text.strip() + match = _USER_QUERY_RE.match(stripped) + if match: + return match.group(1).strip() + return stripped + + +def parse_cursor_session(file_path: Path) -> Iterator[dict]: + """Parse a Cursor agent transcript JSONL into normalized entries.""" + session_id = file_path.stem + project = _extract_project_from_path(file_path) + fallback_ts = _file_mtime_iso(file_path) + + with open(file_path, "rb") as fh: + for raw in fh: + raw = raw.strip() + if not raw: + continue + try: + line = json.loads(raw) + except (json.JSONDecodeError, ValueError): + continue + + role = line.get("role") + message = line.get("message") or {} + text = _extract_text(message.get("content")).strip() + if not text: + continue + + if role == "user": + text = _clean_user_text(text) + if len(text) < _MIN_USER_LEN: + continue + yield { + "content": text, + "content_type": "user_message", + "session_id": session_id, + "timestamp": fallback_ts, + "project": project, + "source": "cursor", + "metadata": { + "session_id": session_id, + "sender": "user", + "source_file": str(file_path), + }, + } + continue + + if role == "assistant": + ctype = "ai_code" if "```" in text else "assistant_text" + if ctype != "ai_code" and len(text) < _MIN_ASSISTANT_LEN: + continue + yield { + "content": text, + "content_type": ctype, + "session_id": session_id, + "timestamp": fallback_ts, + "project": project, + "source": "cursor", + "metadata": { + "session_id": session_id, + "sender": "assistant", + "source_file": str(file_path), + }, + } + + +def ingest_cursor_session( + file_path: Path, + db_path: Optional[Path] = None, + project_override: Optional[str] = None, + dry_run: bool = False, + verbose: bool = False, +) -> int: + """Ingest a single Cursor session transcript.""" + from ..index_new import index_chunks_to_sqlite + from ..pipeline.chunk import chunk_content + from ..pipeline.classify import ClassifiedContent, ContentType, ContentValue + + type_map = { + "user_message": (ContentType.USER_MESSAGE, ContentValue.HIGH), + "assistant_text": (ContentType.ASSISTANT_TEXT, ContentValue.MEDIUM), + "ai_code": (ContentType.AI_CODE, ContentValue.HIGH), + } + + all_chunks: List = [] + project = project_override + session_id: Optional[str] = None + session_ts: Optional[str] = None + + for entry in parse_cursor_session(file_path): + if project is None: + project = entry.get("project") + if session_id is None: + session_id = entry.get("session_id") + if session_ts is None: + session_ts = entry.get("timestamp") + + content_type, value = type_map.get(entry["content_type"], (ContentType.ASSISTANT_TEXT, ContentValue.MEDIUM)) + classified = ClassifiedContent( + content=entry["content"], + content_type=content_type, + value=value, + metadata={**entry.get("metadata", {}), "source": "cursor"}, + ) + chunks = chunk_content(classified) + all_chunks.extend(chunks) + if verbose: + print(f" [{entry['content_type']}] {entry['content'][:80]!r}") + + if not all_chunks: + logger.info("No indexable content in %s", file_path) + return 0 + + if dry_run: + print(f"Dry run: {len(all_chunks)} chunks from {file_path.name} (not stored)") + return len(all_chunks) + + if db_path is None: + from ..paths import DEFAULT_DB_PATH + + db_path = DEFAULT_DB_PATH + + for chunk in all_chunks: + chunk.metadata.setdefault("source", "cursor") + + indexed = index_chunks_to_sqlite( + all_chunks, + source_file=str(file_path), + project=project, + db_path=db_path, + created_at=session_ts, + ) + + if session_id: + try: + from ..vector_store import VectorStore + + with VectorStore(db_path) as store: + store.store_session_context( + session_id=session_id, + project=project or "cursor", + started_at=session_ts, + ended_at=None, + ) + except Exception as exc: + logger.debug("Could not store session context for %s: %s", session_id, exc) + + logger.info("Indexed %d chunks from %s (session %s, project %s)", indexed, file_path.name, session_id, project) + return indexed + + +def ingest_cursor_dir( + sessions_dir: Optional[Path] = None, + db_path: Optional[Path] = None, + project_override: Optional[str] = None, + since_days: Optional[int] = None, + dry_run: bool = False, + verbose: bool = False, +) -> tuple[int, int]: + """Ingest Cursor agent transcript files under ~/.cursor/projects.""" + if sessions_dir is None: + sessions_dir = Path.home() / ".cursor" / "projects" + + if not sessions_dir.exists(): + raise FileNotFoundError(f"Cursor projects directory not found: {sessions_dir}") + + jsonl_files = sorted(sessions_dir.glob("**/agent-transcripts/**/*.jsonl")) + if since_days is not None: + cutoff = datetime.now(timezone.utc).timestamp() - since_days * 86400 + jsonl_files = [f for f in jsonl_files if f.stat().st_mtime >= cutoff] + + if not jsonl_files: + logger.info("No Cursor transcript files found in %s", sessions_dir) + return 0, 0 + + if not dry_run and db_path is None: + from ..paths import DEFAULT_DB_PATH + + db_path = DEFAULT_DB_PATH + + already_indexed: set[str] = set() + if not dry_run and db_path and db_path.exists(): + try: + from ..vector_store import VectorStore + + with VectorStore(db_path) as store: + cursor = store._read_cursor() + rows = cursor.execute("SELECT DISTINCT source_file FROM chunks WHERE source = 'cursor'") + already_indexed = {row[0] for row in rows} + except Exception as exc: + logger.debug("Could not check existing cursor chunks: %s", exc) + + files_processed = 0 + total_chunks = 0 + + for file_path in jsonl_files: + if str(file_path) in already_indexed: + logger.debug("Skipping already-indexed %s", file_path.name) + continue + try: + total_chunks += ingest_cursor_session( + file_path, + db_path=db_path, + project_override=project_override, + dry_run=dry_run, + verbose=verbose, + ) + files_processed += 1 + except Exception as exc: + logger.warning("Failed to ingest %s: %s", file_path.name, exc) + + return files_processed, total_chunks diff --git a/src/brainlayer/ingest/gemini.py b/src/brainlayer/ingest/gemini.py new file mode 100644 index 00000000..cf72c415 --- /dev/null +++ b/src/brainlayer/ingest/gemini.py @@ -0,0 +1,249 @@ +"""Gemini CLI session ingestion adapter for BrainLayer.""" + +from __future__ import annotations + +import json +import logging +from datetime import datetime, timezone +from pathlib import Path +from typing import Iterator, List, Optional + +logger = logging.getLogger(__name__) + +_MIN_USER_LEN = 15 +_MIN_ASSISTANT_LEN = 30 + + +def _extract_text(content) -> str: + if isinstance(content, str): + return content + if isinstance(content, dict): + return str(content.get("text", "")) + if isinstance(content, list): + parts = [] + for block in content: + if isinstance(block, dict): + if "text" in block: + parts.append(str(block.get("text", ""))) + elif block.get("type") == "text": + parts.append(str(block.get("text", ""))) + elif isinstance(block, str): + parts.append(block) + return "\n".join(part for part in parts if part) + return "" + + +def _extract_project_from_path(file_path: Path) -> Optional[str]: + parts = file_path.parts + if "tmp" in parts: + idx = parts.index("tmp") + 1 + if idx < len(parts): + return parts[idx] + return None + + +def _file_mtime_iso(file_path: Path) -> str: + return datetime.fromtimestamp(file_path.stat().st_mtime, timezone.utc).isoformat() + + +def parse_gemini_session(file_path: Path) -> Iterator[dict]: + """Parse a Gemini session JSON into normalized entries.""" + with open(file_path) as fh: + payload = json.load(fh) + + session_id = payload.get("sessionId") or file_path.stem + project = _extract_project_from_path(file_path) + session_start = payload.get("startTime") or _file_mtime_iso(file_path) + + for message in payload.get("messages") or []: + message_type = message.get("type") + if message_type not in {"user", "gemini"}: + continue + + text = _extract_text(message.get("content")).strip() + if not text: + continue + + if message_type == "user": + if len(text) < _MIN_USER_LEN: + continue + yield { + "content": text, + "content_type": "user_message", + "session_id": session_id, + "timestamp": message.get("timestamp") or session_start, + "project": project, + "source": "gemini", + "metadata": { + "session_id": session_id, + "sender": "user", + "source_file": str(file_path), + }, + } + continue + + ctype = "ai_code" if "```" in text else "assistant_text" + if ctype != "ai_code" and len(text) < _MIN_ASSISTANT_LEN: + continue + yield { + "content": text, + "content_type": ctype, + "session_id": session_id, + "timestamp": message.get("timestamp") or session_start, + "project": project, + "source": "gemini", + "metadata": { + "session_id": session_id, + "sender": "assistant", + "model": message.get("model"), + "source_file": str(file_path), + }, + } + + +def ingest_gemini_session( + file_path: Path, + db_path: Optional[Path] = None, + project_override: Optional[str] = None, + dry_run: bool = False, + verbose: bool = False, +) -> int: + """Ingest a single Gemini session JSON file.""" + from ..index_new import index_chunks_to_sqlite + from ..pipeline.chunk import chunk_content + from ..pipeline.classify import ClassifiedContent, ContentType, ContentValue + + type_map = { + "user_message": (ContentType.USER_MESSAGE, ContentValue.HIGH), + "assistant_text": (ContentType.ASSISTANT_TEXT, ContentValue.MEDIUM), + "ai_code": (ContentType.AI_CODE, ContentValue.HIGH), + } + + all_chunks: List = [] + project = project_override + session_id: Optional[str] = None + session_ts: Optional[str] = None + + for entry in parse_gemini_session(file_path): + if project is None: + project = entry.get("project") + if session_id is None: + session_id = entry.get("session_id") + if session_ts is None: + session_ts = entry.get("timestamp") + + content_type, value = type_map.get(entry["content_type"], (ContentType.ASSISTANT_TEXT, ContentValue.MEDIUM)) + classified = ClassifiedContent( + content=entry["content"], + content_type=content_type, + value=value, + metadata={**entry.get("metadata", {}), "source": "gemini"}, + ) + chunks = chunk_content(classified) + all_chunks.extend(chunks) + if verbose: + print(f" [{entry['content_type']}] {entry['content'][:80]!r}") + + if not all_chunks: + logger.info("No indexable content in %s", file_path) + return 0 + + if dry_run: + print(f"Dry run: {len(all_chunks)} chunks from {file_path.name} (not stored)") + return len(all_chunks) + + if db_path is None: + from ..paths import DEFAULT_DB_PATH + + db_path = DEFAULT_DB_PATH + + for chunk in all_chunks: + chunk.metadata.setdefault("source", "gemini") + + indexed = index_chunks_to_sqlite( + all_chunks, + source_file=str(file_path), + project=project, + db_path=db_path, + created_at=session_ts, + ) + + if session_id: + try: + from ..vector_store import VectorStore + + with VectorStore(db_path) as store: + store.store_session_context( + session_id=session_id, + project=project or "gemini", + started_at=session_ts, + ended_at=None, + ) + except Exception as exc: + logger.debug("Could not store session context for %s: %s", session_id, exc) + + logger.info("Indexed %d chunks from %s (session %s, project %s)", indexed, file_path.name, session_id, project) + return indexed + + +def ingest_gemini_dir( + sessions_dir: Optional[Path] = None, + db_path: Optional[Path] = None, + project_override: Optional[str] = None, + since_days: Optional[int] = None, + dry_run: bool = False, + verbose: bool = False, +) -> tuple[int, int]: + """Ingest Gemini session files under ~/.gemini/tmp.""" + if sessions_dir is None: + sessions_dir = Path.home() / ".gemini" / "tmp" + + if not sessions_dir.exists(): + raise FileNotFoundError(f"Gemini sessions directory not found: {sessions_dir}") + + session_files = sorted(sessions_dir.glob("**/chats/session-*.json")) + if since_days is not None: + cutoff = datetime.now(timezone.utc).timestamp() - since_days * 86400 + session_files = [f for f in session_files if f.stat().st_mtime >= cutoff] + + if not session_files: + logger.info("No Gemini session files found in %s", sessions_dir) + return 0, 0 + + if not dry_run and db_path is None: + from ..paths import DEFAULT_DB_PATH + + db_path = DEFAULT_DB_PATH + + already_indexed: set[str] = set() + if not dry_run and db_path and db_path.exists(): + try: + from ..vector_store import VectorStore + + with VectorStore(db_path) as store: + cursor = store._read_cursor() + rows = cursor.execute("SELECT DISTINCT source_file FROM chunks WHERE source = 'gemini'") + already_indexed = {row[0] for row in rows} + except Exception as exc: + logger.debug("Could not check existing gemini chunks: %s", exc) + + files_processed = 0 + total_chunks = 0 + + for file_path in session_files: + if str(file_path) in already_indexed: + logger.debug("Skipping already-indexed %s", file_path.name) + continue + try: + total_chunks += ingest_gemini_session( + file_path, + db_path=db_path, + project_override=project_override, + dry_run=dry_run, + verbose=verbose, + ) + files_processed += 1 + except Exception as exc: + logger.warning("Failed to ingest %s: %s", file_path.name, exc) + + return files_processed, total_chunks diff --git a/tests/test_agent_ingest_launchd.py b/tests/test_agent_ingest_launchd.py new file mode 100644 index 00000000..b959ced8 --- /dev/null +++ b/tests/test_agent_ingest_launchd.py @@ -0,0 +1,23 @@ +"""Tests for the multi-agent ingest launchd wiring.""" + +from pathlib import Path +import xml.etree.ElementTree as ET + + +def test_agent_ingest_plist_has_correct_label_and_command(): + plist_path = Path(__file__).parent.parent / "scripts" / "launchd" / "com.brainlayer.agent-ingest.plist" + tree = ET.parse(plist_path) + root = tree.getroot() + + strings = [elem.text for elem in root.iter("string") if elem.text] + + assert "com.brainlayer.agent-ingest" in strings + assert "__BRAINLAYER_BIN__" in strings + assert "watch-agents" in strings + + +def test_launchd_installer_mentions_agent_ingest(): + install_script = (Path(__file__).parent.parent / "scripts" / "launchd" / "install.sh").read_text() + + assert "install_plist agent-ingest" in install_script + assert "remove_plist agent-ingest" in install_script diff --git a/tests/test_agent_session_watcher.py b/tests/test_agent_session_watcher.py new file mode 100644 index 00000000..ff80ba41 --- /dev/null +++ b/tests/test_agent_session_watcher.py @@ -0,0 +1,72 @@ +"""Tests for the multi-agent session watcher.""" + +import json +from pathlib import Path + +from brainlayer.agent_watch import AgentSessionRegistry, AgentSessionSource, AgentSessionWatcher + + +def _write_jsonl(path: Path, payload: dict) -> None: + path.parent.mkdir(parents=True, exist_ok=True) + with open(path, "w") as f: + f.write(json.dumps(payload) + "\n") + + +def _append_jsonl(path: Path, payload: dict) -> None: + with open(path, "a") as f: + f.write(json.dumps(payload) + "\n") + + +class TestAgentSessionRegistry: + def test_round_trips_file_state(self, tmp_path): + registry = AgentSessionRegistry(tmp_path / "registry.json") + registry.set("/tmp/session.jsonl", mtime_ns=123, size=456) + registry.flush() + + reloaded = AgentSessionRegistry(tmp_path / "registry.json") + assert reloaded.get("/tmp/session.jsonl") == {"mtime_ns": 123, "size": 456} + + +class TestAgentSessionWatcher: + def test_polls_multiple_sources_and_skips_unchanged_files(self, tmp_path): + codex_file = tmp_path / "codex" / "sessions" / "2026" / "04" / "18" / "session-a.jsonl" + cursor_file = tmp_path / "cursor" / "projects" / "brainlayer" / "agent-transcripts" / "abc" / "abc.jsonl" + gemini_file = tmp_path / "gemini" / "tmp" / "brainlayer" / "chats" / "session-a.json" + + _write_jsonl(codex_file, {"type": "session_meta"}) + _write_jsonl(cursor_file, {"role": "user", "message": {"content": [{"type": "text", "text": "hello world"}]}}) + gemini_file.parent.mkdir(parents=True, exist_ok=True) + gemini_file.write_text(json.dumps({"sessionId": "gem-1", "messages": []})) + + seen: list[tuple[str, str]] = [] + + def _record(source_name: str): + def _ingest(path: Path) -> int: + seen.append((source_name, str(path))) + return 1 + + return _ingest + + watcher = AgentSessionWatcher( + registry_path=tmp_path / "agent-registry.json", + sources=[ + AgentSessionSource("codex_cli", ["**/*.jsonl"], _record("codex_cli"), root=tmp_path / "codex" / "sessions"), + AgentSessionSource( + "cursor", + ["**/*.jsonl"], + _record("cursor"), + root=tmp_path / "cursor" / "projects", + ), + AgentSessionSource("gemini", ["**/session-*.json"], _record("gemini"), root=tmp_path / "gemini" / "tmp"), + ], + ) + + assert watcher.poll_once() == 3 + assert len(seen) == 3 + + assert watcher.poll_once() == 0 + assert len(seen) == 3 + + _append_jsonl(codex_file, {"type": "event_msg", "payload": {"type": "user_message"}}) + assert watcher.poll_once() == 1 + assert seen[-1][0] == "codex_cli" diff --git a/tests/test_ingest_cursor.py b/tests/test_ingest_cursor.py new file mode 100644 index 00000000..0845944b --- /dev/null +++ b/tests/test_ingest_cursor.py @@ -0,0 +1,92 @@ +"""Tests for the Cursor session ingestion adapter.""" + +import json +from pathlib import Path + +from brainlayer.ingest.cursor import parse_cursor_session + + +def _write_jsonl(path: Path, lines: list[dict]) -> None: + with open(path, "w") as f: + for line in lines: + f.write(json.dumps(line) + "\n") + + +def _cursor_user(text: str) -> dict: + return { + "role": "user", + "message": { + "content": [{"type": "text", "text": text}], + }, + } + + +def _cursor_assistant(text: str) -> dict: + return { + "role": "assistant", + "message": { + "content": [{"type": "text", "text": text}], + }, + } + + +class TestParseCursorSession: + def test_extracts_user_query_and_assistant_reply(self, tmp_path): + session_dir = tmp_path / ".cursor" / "projects" / "brainlayer" / "agent-transcripts" / "abc123" + session_dir.mkdir(parents=True) + session_file = session_dir / "abc123.jsonl" + _write_jsonl( + session_file, + [ + _cursor_user("\nWire the missing cursor ingestion adapter.\n"), + _cursor_assistant( + "I will inspect the parser wiring and add the missing adapter with tests." + ), + ], + ) + + entries = list(parse_cursor_session(session_file)) + + assert len(entries) == 2 + assert entries[0]["content_type"] == "user_message" + assert entries[0]["content"] == "Wire the missing cursor ingestion adapter." + assert entries[0]["source"] == "cursor" + assert entries[0]["project"] == "brainlayer" + assert entries[0]["session_id"] == "abc123" + + assert entries[1]["content_type"] == "assistant_text" + assert entries[1]["source"] == "cursor" + assert entries[1]["project"] == "brainlayer" + + def test_classifies_code_block_as_ai_code(self, tmp_path): + session_dir = tmp_path / ".cursor" / "projects" / "brainlayer" / "agent-transcripts" / "code-session" + session_dir.mkdir(parents=True) + session_file = session_dir / "code-session.jsonl" + _write_jsonl( + session_file, + [ + _cursor_assistant( + "Here is the parser update:\n```python\n" + "def parse_cursor_session(path):\n return []\n```\n" + ) + ], + ) + + entries = list(parse_cursor_session(session_file)) + + assert len(entries) == 1 + assert entries[0]["content_type"] == "ai_code" + + def test_skips_empty_and_short_messages(self, tmp_path): + session_dir = tmp_path / ".cursor" / "projects" / "brainlayer" / "agent-transcripts" / "skip-session" + session_dir.mkdir(parents=True) + session_file = session_dir / "skip-session.jsonl" + _write_jsonl( + session_file, + [ + _cursor_user("ok"), + _cursor_assistant("short"), + ], + ) + + assert list(parse_cursor_session(session_file)) == [] diff --git a/tests/test_ingest_gemini.py b/tests/test_ingest_gemini.py new file mode 100644 index 00000000..72652497 --- /dev/null +++ b/tests/test_ingest_gemini.py @@ -0,0 +1,78 @@ +"""Tests for the Gemini session ingestion adapter.""" + +import json +from pathlib import Path + +from brainlayer.ingest.gemini import parse_gemini_session + + +def _write_json(path: Path, payload: dict) -> None: + with open(path, "w") as f: + json.dump(payload, f) + + +class TestParseGeminiSession: + def test_extracts_user_and_gemini_messages(self, tmp_path): + session_dir = tmp_path / ".gemini" / "tmp" / "brainlayer" / "chats" + session_dir.mkdir(parents=True) + session_file = session_dir / "session-2026-04-18T02-02-test.json" + _write_json( + session_file, + { + "sessionId": "gem-session-1", + "startTime": "2026-04-18T02:02:10.184Z", + "messages": [ + {"id": "info-1", "timestamp": "2026-04-18T02:02:10.300Z", "type": "info", "content": "skip me"}, + { + "id": "user-1", + "timestamp": "2026-04-18T02:02:11.000Z", + "type": "user", + "content": [{"text": "Backfill the missing Gemini sessions into BrainLayer."}], + }, + { + "id": "assistant-1", + "timestamp": "2026-04-18T02:02:12.000Z", + "type": "gemini", + "content": "I will inspect the session format and backfill in batches.", + }, + ], + }, + ) + + entries = list(parse_gemini_session(session_file)) + + assert len(entries) == 2 + assert entries[0]["content_type"] == "user_message" + assert entries[0]["source"] == "gemini" + assert entries[0]["project"] == "brainlayer" + assert entries[0]["session_id"] == "gem-session-1" + assert entries[0]["timestamp"] == "2026-04-18T02:02:11.000Z" + + assert entries[1]["content_type"] == "assistant_text" + assert entries[1]["source"] == "gemini" + assert entries[1]["project"] == "brainlayer" + + def test_classifies_code_block_as_ai_code(self, tmp_path): + session_dir = tmp_path / ".gemini" / "tmp" / "brainlayer" / "chats" + session_dir.mkdir(parents=True) + session_file = session_dir / "session-code.json" + _write_json( + session_file, + { + "sessionId": "gem-session-2", + "startTime": "2026-04-18T02:02:10.184Z", + "messages": [ + { + "id": "assistant-1", + "timestamp": "2026-04-18T02:02:12.000Z", + "type": "gemini", + "content": "```python\nprint('hello')\n```", + } + ], + }, + ) + + entries = list(parse_gemini_session(session_file)) + + assert len(entries) == 1 + assert entries[0]["content_type"] == "ai_code"