diff --git a/scripts/launchd/com.brainlayer.agent-ingest.plist b/scripts/launchd/com.brainlayer.agent-ingest.plist
new file mode 100644
index 00000000..d857355b
--- /dev/null
+++ b/scripts/launchd/com.brainlayer.agent-ingest.plist
@@ -0,0 +1,44 @@
+
+
+
+
+ Label
+ com.brainlayer.agent-ingest
+
+ ProgramArguments
+
+ __BRAINLAYER_BIN__
+ watch-agents
+ --poll
+ 30.0
+
+
+ StandardOutPath
+ __HOME__/.local/share/brainlayer/logs/agent-ingest.log
+ StandardErrorPath
+ __HOME__/.local/share/brainlayer/logs/agent-ingest.err
+
+ EnvironmentVariables
+
+ PATH
+ /usr/local/bin:/usr/bin:/bin:__HOME__/.local/bin
+ PYTHONUNBUFFERED
+ 1
+
+
+ KeepAlive
+
+
+ RunAtLoad
+
+
+ ThrottleInterval
+ 5
+
+ Nice
+ 10
+
+ ProcessType
+ Background
+
+
diff --git a/scripts/launchd/install.sh b/scripts/launchd/install.sh
index 205bcf4a..2f590fd6 100755
--- a/scripts/launchd/install.sh
+++ b/scripts/launchd/install.sh
@@ -6,6 +6,7 @@
# ./scripts/launchd/install.sh index # Install indexing only
# ./scripts/launchd/install.sh enrich # Install enrichment only
# ./scripts/launchd/install.sh decay # Install decay only
+# ./scripts/launchd/install.sh agent-ingest # Install multi-agent ingest watcher
# ./scripts/launchd/install.sh load enrichment
# ./scripts/launchd/install.sh unload enrichment
# ./scripts/launchd/install.sh checkpoint # Install WAL checkpoint only
@@ -118,6 +119,9 @@ case "${1:-all}" in
decay)
install_plist decay
;;
+ agent-ingest)
+ install_plist agent-ingest
+ ;;
load)
load_plist "${2:-enrichment}"
;;
@@ -129,6 +133,7 @@ case "${1:-all}" in
;;
all)
install_plist index
+ install_plist agent-ingest
install_plist enrichment
install_plist decay
install_plist wal-checkpoint
@@ -137,13 +142,14 @@ case "${1:-all}" in
;;
remove)
remove_plist index
+ remove_plist agent-ingest 2>/dev/null || true
remove_plist enrich 2>/dev/null || true
remove_plist enrichment 2>/dev/null || true
remove_plist decay 2>/dev/null || true
remove_plist wal-checkpoint
;;
*)
- echo "Usage: $0 [index|enrich|enrichment|decay|load [name]|unload [name]|checkpoint|all|remove]"
+ echo "Usage: $0 [index|agent-ingest|enrich|enrichment|decay|load [name]|unload [name]|checkpoint|all|remove]"
exit 1
;;
esac
diff --git a/src/brainlayer/agent_watch.py b/src/brainlayer/agent_watch.py
new file mode 100644
index 00000000..914bcd14
--- /dev/null
+++ b/src/brainlayer/agent_watch.py
@@ -0,0 +1,139 @@
+"""Polling watcher for Codex, Cursor, and Gemini session artifacts."""
+
+from __future__ import annotations
+
+import json
+import logging
+import os
+import tempfile
+import threading
+import time
+from dataclasses import dataclass
+from pathlib import Path
+from typing import Callable
+
+logger = logging.getLogger(__name__)
+
+
+class AgentSessionRegistry:
+ """Persists mtime/size state for agent session files."""
+
+ def __init__(self, path: str | Path):
+ self.path = Path(path)
+ self._data: dict[str, dict[str, int]] = {}
+ self._dirty = False
+ self._load()
+
+ def _load(self) -> None:
+ try:
+ with open(self.path) as fh:
+ data = json.load(fh)
+ if isinstance(data, dict):
+ self._data = data
+ except (OSError, json.JSONDecodeError, ValueError):
+ self._data = {}
+
+ def get(self, filepath: str) -> dict[str, int] | None:
+ return self._data.get(filepath)
+
+ def set(self, filepath: str, *, mtime_ns: int, size: int) -> None:
+ self._data[filepath] = {"mtime_ns": mtime_ns, "size": size}
+ self._dirty = True
+
+ def flush(self) -> bool:
+ if not self._dirty:
+ return True
+
+ tmp_path = None
+ try:
+ self.path.parent.mkdir(parents=True, exist_ok=True)
+ fd, tmp_path = tempfile.mkstemp(dir=str(self.path.parent), suffix=".tmp")
+ with os.fdopen(fd, "w") as fh:
+ json.dump(self._data, fh)
+ os.rename(tmp_path, self.path)
+ self._dirty = False
+ return True
+ except OSError as exc:
+ logger.warning("Failed to flush agent registry: %s", exc)
+ if tmp_path:
+ try:
+ os.unlink(tmp_path)
+ except OSError:
+ pass
+ return False
+
+
+@dataclass(frozen=True)
+class AgentSessionSource:
+ name: str
+ patterns: list[str]
+ ingest: Callable[[Path], int]
+ root: Path
+
+
+class AgentSessionWatcher:
+ """Poll source roots and ingest files whose size/mtime changed."""
+
+ def __init__(
+ self,
+ registry_path: str | Path,
+ sources: list[AgentSessionSource],
+ poll_interval_s: float = 30.0,
+ registry_flush_interval_s: float = 5.0,
+ ):
+ self.registry = AgentSessionRegistry(registry_path)
+ self.sources = sources
+ self.poll_interval_s = poll_interval_s
+ self.registry_flush_interval_s = registry_flush_interval_s
+ self._stop = threading.Event()
+ self._last_registry_flush = time.monotonic()
+
+ def _discover_files(self, source: AgentSessionSource) -> list[Path]:
+ files: list[Path] = []
+ if not source.root.exists():
+ return files
+ for pattern in source.patterns:
+ try:
+ files.extend(path for path in source.root.glob(pattern) if path.is_file())
+ except OSError:
+ continue
+ return sorted(set(files))
+
+ def poll_once(self) -> int:
+ processed = 0
+ for source in self.sources:
+ for file_path in self._discover_files(source):
+ try:
+ stat = file_path.stat()
+ except OSError:
+ continue
+
+ state = {"mtime_ns": stat.st_mtime_ns, "size": stat.st_size}
+ previous = self.registry.get(str(file_path))
+ if previous == state:
+ continue
+
+ indexed = source.ingest(file_path)
+ logger.info("Agent ingest %s %s -> %d chunks", source.name, file_path.name, indexed)
+ self.registry.set(str(file_path), **state)
+ processed += 1
+
+ now = time.monotonic()
+ if now - self._last_registry_flush >= self.registry_flush_interval_s:
+ self.registry.flush()
+ self._last_registry_flush = now
+ return processed
+
+ def start(self) -> None:
+ while not self._stop.is_set():
+ try:
+ self.poll_once()
+ except Exception as exc:
+ logger.error("Agent watcher poll failed: %s", exc)
+ self._stop.wait(self.poll_interval_s)
+
+ self.registry.flush()
+
+ def stop(self) -> None:
+ self._stop.set()
+
diff --git a/src/brainlayer/cli/__init__.py b/src/brainlayer/cli/__init__.py
index 198a460d..5cda4cd5 100644
--- a/src/brainlayer/cli/__init__.py
+++ b/src/brainlayer/cli/__init__.py
@@ -1875,6 +1875,182 @@ def ingest_codex(
raise typer.Exit(1)
+@app.command("ingest-cursor")
+def ingest_cursor(
+ path: Optional[Path] = typer.Argument(
+ None,
+ help="Cursor transcript JSONL file or projects directory (default: ~/.cursor/projects)",
+ ),
+ project: str = typer.Option(None, "--project", "-p", help="Override project name"),
+ since_days: int = typer.Option(None, "--since-days", "-d", help="Only process last N days"),
+ dry_run: bool = typer.Option(False, "--dry-run", help="Parse but do not write to DB"),
+ verbose: bool = typer.Option(False, "--verbose", "-v", help="Print each classified entry"),
+) -> None:
+ """Ingest Cursor agent transcripts into BrainLayer."""
+ from ..ingest.cursor import ingest_cursor_dir, ingest_cursor_session
+ from ..paths import DEFAULT_DB_PATH
+
+ db_path = DEFAULT_DB_PATH
+
+ try:
+ if path and path.is_file():
+ rprint(f"[bold blue]זיכרון[/] — Ingesting Cursor session: [bold]{path.name}[/]")
+ with console.status("Indexing..."):
+ n = ingest_cursor_session(
+ path,
+ db_path=db_path,
+ project_override=project,
+ dry_run=dry_run,
+ verbose=verbose,
+ )
+ rprint(f"[bold green]✓[/] Indexed [bold]{n}[/] chunks from {path.name}")
+ else:
+ sessions_root = path if path else None
+ label = str(sessions_root) if sessions_root else "~/.cursor/projects"
+ rprint(f"[bold blue]זיכרון[/] — Ingesting Cursor sessions from: [bold]{label}[/]")
+ with Progress(
+ SpinnerColumn(),
+ TextColumn("[progress.description]{task.description}"),
+ TimeElapsedColumn(),
+ console=console,
+ ) as progress:
+ task = progress.add_task("Scanning sessions...", total=None)
+ files, chunks = ingest_cursor_dir(
+ sessions_dir=sessions_root,
+ db_path=db_path,
+ project_override=project,
+ since_days=since_days,
+ dry_run=dry_run,
+ verbose=verbose,
+ )
+ progress.update(task, description=f"Done — {files} files, {chunks:,} chunks")
+ tag = " [dim](dry run)[/]" if dry_run else ""
+ rprint(f"[bold green]✓[/] Processed [bold]{files}[/] session files, [bold]{chunks:,}[/] chunks{tag}")
+ except FileNotFoundError as e:
+ rprint(f"[bold red]Error:[/] {e}")
+ raise typer.Exit(1)
+ except Exception as e:
+ rprint(f"[bold red]Error:[/] {e}")
+ raise typer.Exit(1)
+
+
+@app.command("ingest-gemini")
+def ingest_gemini(
+ path: Optional[Path] = typer.Argument(
+ None,
+ help="Gemini session JSON file or root directory (default: ~/.gemini/tmp)",
+ ),
+ project: str = typer.Option(None, "--project", "-p", help="Override project name"),
+ since_days: int = typer.Option(None, "--since-days", "-d", help="Only process last N days"),
+ dry_run: bool = typer.Option(False, "--dry-run", help="Parse but do not write to DB"),
+ verbose: bool = typer.Option(False, "--verbose", "-v", help="Print each classified entry"),
+) -> None:
+ """Ingest Gemini session transcripts into BrainLayer."""
+ from ..ingest.gemini import ingest_gemini_dir, ingest_gemini_session
+ from ..paths import DEFAULT_DB_PATH
+
+ db_path = DEFAULT_DB_PATH
+
+ try:
+ if path and path.is_file():
+ rprint(f"[bold blue]זיכרון[/] — Ingesting Gemini session: [bold]{path.name}[/]")
+ with console.status("Indexing..."):
+ n = ingest_gemini_session(
+ path,
+ db_path=db_path,
+ project_override=project,
+ dry_run=dry_run,
+ verbose=verbose,
+ )
+ rprint(f"[bold green]✓[/] Indexed [bold]{n}[/] chunks from {path.name}")
+ else:
+ sessions_root = path if path else None
+ label = str(sessions_root) if sessions_root else "~/.gemini/tmp"
+ rprint(f"[bold blue]זיכרון[/] — Ingesting Gemini sessions from: [bold]{label}[/]")
+ with Progress(
+ SpinnerColumn(),
+ TextColumn("[progress.description]{task.description}"),
+ TimeElapsedColumn(),
+ console=console,
+ ) as progress:
+ task = progress.add_task("Scanning sessions...", total=None)
+ files, chunks = ingest_gemini_dir(
+ sessions_dir=sessions_root,
+ db_path=db_path,
+ project_override=project,
+ since_days=since_days,
+ dry_run=dry_run,
+ verbose=verbose,
+ )
+ progress.update(task, description=f"Done — {files} files, {chunks:,} chunks")
+ tag = " [dim](dry run)[/]" if dry_run else ""
+ rprint(f"[bold green]✓[/] Processed [bold]{files}[/] session files, [bold]{chunks:,}[/] chunks{tag}")
+ except FileNotFoundError as e:
+ rprint(f"[bold red]Error:[/] {e}")
+ raise typer.Exit(1)
+ except Exception as e:
+ rprint(f"[bold red]Error:[/] {e}")
+ raise typer.Exit(1)
+
+
+@app.command("watch-agents")
+def watch_agents(
+ poll_interval: float = typer.Option(30.0, "--poll", help="Poll interval in seconds"),
+) -> None:
+ """Watch Codex, Cursor, and Gemini session roots and ingest changed files."""
+ import signal
+
+ from ..agent_watch import AgentSessionSource, AgentSessionWatcher
+ from ..ingest.codex import ingest_codex_session
+ from ..ingest.cursor import ingest_cursor_session
+ from ..ingest.gemini import ingest_gemini_session
+ from ..paths import get_db_path
+
+ db_path = get_db_path()
+ registry_path = db_path.parent / "agent-session-registry.json"
+
+ watcher = AgentSessionWatcher(
+ registry_path=registry_path,
+ poll_interval_s=poll_interval,
+ sources=[
+ AgentSessionSource(
+ name="codex_cli",
+ patterns=["**/*.jsonl"],
+ ingest=lambda path: ingest_codex_session(path, db_path=db_path),
+ root=Path.home() / ".codex" / "sessions",
+ ),
+ AgentSessionSource(
+ name="cursor",
+ patterns=["**/agent-transcripts/**/*.jsonl"],
+ ingest=lambda path: ingest_cursor_session(path, db_path=db_path),
+ root=Path.home() / ".cursor" / "projects",
+ ),
+ AgentSessionSource(
+ name="gemini",
+ patterns=["**/chats/session-*.json"],
+ ingest=lambda path: ingest_gemini_session(path, db_path=db_path),
+ root=Path.home() / ".gemini" / "tmp",
+ ),
+ ],
+ )
+
+ rprint("[bold blue]זיכרון[/] Multi-agent session watcher")
+ rprint(f" Registry: [bold]{registry_path}[/]")
+ rprint(f" Poll: {poll_interval}s")
+ rprint(" Sources: codex_cli, cursor, gemini")
+ rprint()
+
+ def handle_signal(signum, frame):
+ rprint("\n[bold yellow]Stopping multi-agent watcher...[/]")
+ watcher.stop()
+
+ signal.signal(signal.SIGTERM, handle_signal)
+ signal.signal(signal.SIGINT, handle_signal)
+
+ watcher.start()
+ rprint("[bold green]Done.[/]")
+
+
@app.command("analyze-semantic")
def analyze_semantic(
whatsapp_limit: int = typer.Option(5000, "--whatsapp-limit", "-w", help="Number of WhatsApp messages to analyze"),
diff --git a/src/brainlayer/index_new.py b/src/brainlayer/index_new.py
index 80521273..6b9866ab 100644
--- a/src/brainlayer/index_new.py
+++ b/src/brainlayer/index_new.py
@@ -20,6 +20,7 @@ def index_chunks_to_sqlite(
project: Optional[str] = None,
db_path: Path = DEFAULT_DB_PATH,
on_progress: Optional[Callable[[int, int], None]] = None,
+ created_at: Optional[str] = None,
) -> int:
"""Index chunks to sqlite-vec database."""
if not chunks:
@@ -44,22 +45,22 @@ def index_chunks_to_sqlite(
if not embedded_chunks:
return 0
- # Try to get timestamp from source file (first JSONL message)
- created_at = None
- try:
- import json as _json
-
- with open(source_file) as _f:
- for _line in _f:
- _line = _line.strip()
- if not _line:
- continue
- _data = _json.loads(_line)
- if "timestamp" in _data:
- created_at = _data["timestamp"]
- break
- except Exception as e:
- logger.debug("Could not extract timestamp from %s: %s", source_file, e)
+ if not created_at:
+ # Try to get timestamp from source file (first JSONL message)
+ try:
+ import json as _json
+
+ with open(source_file) as _f:
+ for _line in _f:
+ _line = _line.strip()
+ if not _line:
+ continue
+ _data = _json.loads(_line)
+ if "timestamp" in _data:
+ created_at = _data["timestamp"]
+ break
+ except Exception as e:
+ logger.debug("Could not extract timestamp from %s: %s", source_file, e)
if not created_at:
from datetime import datetime, timezone
diff --git a/src/brainlayer/ingest/cursor.py b/src/brainlayer/ingest/cursor.py
new file mode 100644
index 00000000..acd9cd16
--- /dev/null
+++ b/src/brainlayer/ingest/cursor.py
@@ -0,0 +1,275 @@
+"""Cursor session ingestion adapter for BrainLayer."""
+
+from __future__ import annotations
+
+import json
+import logging
+import re
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Iterator, List, Optional
+
+logger = logging.getLogger(__name__)
+
+_MIN_USER_LEN = 15
+_MIN_ASSISTANT_LEN = 30
+
+_USER_QUERY_RE = re.compile(r"^\s*\s*(.*?)\s*\s*$", re.DOTALL)
+
+
+def _extract_text(content) -> str:
+ """Extract text content from Cursor message blocks."""
+ if isinstance(content, str):
+ return content
+ if isinstance(content, dict):
+ return str(content.get("text", ""))
+ if isinstance(content, list):
+ parts = []
+ for block in content:
+ if isinstance(block, dict) and block.get("type") == "text":
+ parts.append(block.get("text", ""))
+ elif isinstance(block, str):
+ parts.append(block)
+ return "\n".join(part for part in parts if part)
+ return ""
+
+
+def _normalize_project_name(raw: str) -> str:
+ """Convert encoded path-like project names to the repo leaf."""
+ if not raw:
+ return raw
+
+ for marker in ("-Gits-", "-Desktop-", "-projects-", "-config-"):
+ if marker in raw:
+ return raw.split(marker)[-1]
+
+ return raw
+
+
+def _extract_project_from_path(file_path: Path) -> Optional[str]:
+ parts = file_path.parts
+ if "projects" in parts:
+ idx = parts.index("projects") + 1
+ if idx < len(parts):
+ return _normalize_project_name(parts[idx])
+ return None
+
+
+def _file_mtime_iso(file_path: Path) -> str:
+ return datetime.fromtimestamp(file_path.stat().st_mtime, timezone.utc).isoformat()
+
+
+def _clean_user_text(text: str) -> str:
+ stripped = text.strip()
+ match = _USER_QUERY_RE.match(stripped)
+ if match:
+ return match.group(1).strip()
+ return stripped
+
+
+def parse_cursor_session(file_path: Path) -> Iterator[dict]:
+ """Parse a Cursor agent transcript JSONL into normalized entries."""
+ session_id = file_path.stem
+ project = _extract_project_from_path(file_path)
+ fallback_ts = _file_mtime_iso(file_path)
+
+ with open(file_path, "rb") as fh:
+ for raw in fh:
+ raw = raw.strip()
+ if not raw:
+ continue
+ try:
+ line = json.loads(raw)
+ except (json.JSONDecodeError, ValueError):
+ continue
+
+ role = line.get("role")
+ message = line.get("message") or {}
+ text = _extract_text(message.get("content")).strip()
+ if not text:
+ continue
+
+ if role == "user":
+ text = _clean_user_text(text)
+ if len(text) < _MIN_USER_LEN:
+ continue
+ yield {
+ "content": text,
+ "content_type": "user_message",
+ "session_id": session_id,
+ "timestamp": fallback_ts,
+ "project": project,
+ "source": "cursor",
+ "metadata": {
+ "session_id": session_id,
+ "sender": "user",
+ "source_file": str(file_path),
+ },
+ }
+ continue
+
+ if role == "assistant":
+ ctype = "ai_code" if "```" in text else "assistant_text"
+ if ctype != "ai_code" and len(text) < _MIN_ASSISTANT_LEN:
+ continue
+ yield {
+ "content": text,
+ "content_type": ctype,
+ "session_id": session_id,
+ "timestamp": fallback_ts,
+ "project": project,
+ "source": "cursor",
+ "metadata": {
+ "session_id": session_id,
+ "sender": "assistant",
+ "source_file": str(file_path),
+ },
+ }
+
+
+def ingest_cursor_session(
+ file_path: Path,
+ db_path: Optional[Path] = None,
+ project_override: Optional[str] = None,
+ dry_run: bool = False,
+ verbose: bool = False,
+) -> int:
+ """Ingest a single Cursor session transcript."""
+ from ..index_new import index_chunks_to_sqlite
+ from ..pipeline.chunk import chunk_content
+ from ..pipeline.classify import ClassifiedContent, ContentType, ContentValue
+
+ type_map = {
+ "user_message": (ContentType.USER_MESSAGE, ContentValue.HIGH),
+ "assistant_text": (ContentType.ASSISTANT_TEXT, ContentValue.MEDIUM),
+ "ai_code": (ContentType.AI_CODE, ContentValue.HIGH),
+ }
+
+ all_chunks: List = []
+ project = project_override
+ session_id: Optional[str] = None
+ session_ts: Optional[str] = None
+
+ for entry in parse_cursor_session(file_path):
+ if project is None:
+ project = entry.get("project")
+ if session_id is None:
+ session_id = entry.get("session_id")
+ if session_ts is None:
+ session_ts = entry.get("timestamp")
+
+ content_type, value = type_map.get(entry["content_type"], (ContentType.ASSISTANT_TEXT, ContentValue.MEDIUM))
+ classified = ClassifiedContent(
+ content=entry["content"],
+ content_type=content_type,
+ value=value,
+ metadata={**entry.get("metadata", {}), "source": "cursor"},
+ )
+ chunks = chunk_content(classified)
+ all_chunks.extend(chunks)
+ if verbose:
+ print(f" [{entry['content_type']}] {entry['content'][:80]!r}")
+
+ if not all_chunks:
+ logger.info("No indexable content in %s", file_path)
+ return 0
+
+ if dry_run:
+ print(f"Dry run: {len(all_chunks)} chunks from {file_path.name} (not stored)")
+ return len(all_chunks)
+
+ if db_path is None:
+ from ..paths import DEFAULT_DB_PATH
+
+ db_path = DEFAULT_DB_PATH
+
+ for chunk in all_chunks:
+ chunk.metadata.setdefault("source", "cursor")
+
+ indexed = index_chunks_to_sqlite(
+ all_chunks,
+ source_file=str(file_path),
+ project=project,
+ db_path=db_path,
+ created_at=session_ts,
+ )
+
+ if session_id:
+ try:
+ from ..vector_store import VectorStore
+
+ with VectorStore(db_path) as store:
+ store.store_session_context(
+ session_id=session_id,
+ project=project or "cursor",
+ started_at=session_ts,
+ ended_at=None,
+ )
+ except Exception as exc:
+ logger.debug("Could not store session context for %s: %s", session_id, exc)
+
+ logger.info("Indexed %d chunks from %s (session %s, project %s)", indexed, file_path.name, session_id, project)
+ return indexed
+
+
+def ingest_cursor_dir(
+ sessions_dir: Optional[Path] = None,
+ db_path: Optional[Path] = None,
+ project_override: Optional[str] = None,
+ since_days: Optional[int] = None,
+ dry_run: bool = False,
+ verbose: bool = False,
+) -> tuple[int, int]:
+ """Ingest Cursor agent transcript files under ~/.cursor/projects."""
+ if sessions_dir is None:
+ sessions_dir = Path.home() / ".cursor" / "projects"
+
+ if not sessions_dir.exists():
+ raise FileNotFoundError(f"Cursor projects directory not found: {sessions_dir}")
+
+ jsonl_files = sorted(sessions_dir.glob("**/agent-transcripts/**/*.jsonl"))
+ if since_days is not None:
+ cutoff = datetime.now(timezone.utc).timestamp() - since_days * 86400
+ jsonl_files = [f for f in jsonl_files if f.stat().st_mtime >= cutoff]
+
+ if not jsonl_files:
+ logger.info("No Cursor transcript files found in %s", sessions_dir)
+ return 0, 0
+
+ if not dry_run and db_path is None:
+ from ..paths import DEFAULT_DB_PATH
+
+ db_path = DEFAULT_DB_PATH
+
+ already_indexed: set[str] = set()
+ if not dry_run and db_path and db_path.exists():
+ try:
+ from ..vector_store import VectorStore
+
+ with VectorStore(db_path) as store:
+ cursor = store._read_cursor()
+ rows = cursor.execute("SELECT DISTINCT source_file FROM chunks WHERE source = 'cursor'")
+ already_indexed = {row[0] for row in rows}
+ except Exception as exc:
+ logger.debug("Could not check existing cursor chunks: %s", exc)
+
+ files_processed = 0
+ total_chunks = 0
+
+ for file_path in jsonl_files:
+ if str(file_path) in already_indexed:
+ logger.debug("Skipping already-indexed %s", file_path.name)
+ continue
+ try:
+ total_chunks += ingest_cursor_session(
+ file_path,
+ db_path=db_path,
+ project_override=project_override,
+ dry_run=dry_run,
+ verbose=verbose,
+ )
+ files_processed += 1
+ except Exception as exc:
+ logger.warning("Failed to ingest %s: %s", file_path.name, exc)
+
+ return files_processed, total_chunks
diff --git a/src/brainlayer/ingest/gemini.py b/src/brainlayer/ingest/gemini.py
new file mode 100644
index 00000000..cf72c415
--- /dev/null
+++ b/src/brainlayer/ingest/gemini.py
@@ -0,0 +1,249 @@
+"""Gemini CLI session ingestion adapter for BrainLayer."""
+
+from __future__ import annotations
+
+import json
+import logging
+from datetime import datetime, timezone
+from pathlib import Path
+from typing import Iterator, List, Optional
+
+logger = logging.getLogger(__name__)
+
+_MIN_USER_LEN = 15
+_MIN_ASSISTANT_LEN = 30
+
+
+def _extract_text(content) -> str:
+ if isinstance(content, str):
+ return content
+ if isinstance(content, dict):
+ return str(content.get("text", ""))
+ if isinstance(content, list):
+ parts = []
+ for block in content:
+ if isinstance(block, dict):
+ if "text" in block:
+ parts.append(str(block.get("text", "")))
+ elif block.get("type") == "text":
+ parts.append(str(block.get("text", "")))
+ elif isinstance(block, str):
+ parts.append(block)
+ return "\n".join(part for part in parts if part)
+ return ""
+
+
+def _extract_project_from_path(file_path: Path) -> Optional[str]:
+ parts = file_path.parts
+ if "tmp" in parts:
+ idx = parts.index("tmp") + 1
+ if idx < len(parts):
+ return parts[idx]
+ return None
+
+
+def _file_mtime_iso(file_path: Path) -> str:
+ return datetime.fromtimestamp(file_path.stat().st_mtime, timezone.utc).isoformat()
+
+
+def parse_gemini_session(file_path: Path) -> Iterator[dict]:
+ """Parse a Gemini session JSON into normalized entries."""
+ with open(file_path) as fh:
+ payload = json.load(fh)
+
+ session_id = payload.get("sessionId") or file_path.stem
+ project = _extract_project_from_path(file_path)
+ session_start = payload.get("startTime") or _file_mtime_iso(file_path)
+
+ for message in payload.get("messages") or []:
+ message_type = message.get("type")
+ if message_type not in {"user", "gemini"}:
+ continue
+
+ text = _extract_text(message.get("content")).strip()
+ if not text:
+ continue
+
+ if message_type == "user":
+ if len(text) < _MIN_USER_LEN:
+ continue
+ yield {
+ "content": text,
+ "content_type": "user_message",
+ "session_id": session_id,
+ "timestamp": message.get("timestamp") or session_start,
+ "project": project,
+ "source": "gemini",
+ "metadata": {
+ "session_id": session_id,
+ "sender": "user",
+ "source_file": str(file_path),
+ },
+ }
+ continue
+
+ ctype = "ai_code" if "```" in text else "assistant_text"
+ if ctype != "ai_code" and len(text) < _MIN_ASSISTANT_LEN:
+ continue
+ yield {
+ "content": text,
+ "content_type": ctype,
+ "session_id": session_id,
+ "timestamp": message.get("timestamp") or session_start,
+ "project": project,
+ "source": "gemini",
+ "metadata": {
+ "session_id": session_id,
+ "sender": "assistant",
+ "model": message.get("model"),
+ "source_file": str(file_path),
+ },
+ }
+
+
+def ingest_gemini_session(
+ file_path: Path,
+ db_path: Optional[Path] = None,
+ project_override: Optional[str] = None,
+ dry_run: bool = False,
+ verbose: bool = False,
+) -> int:
+ """Ingest a single Gemini session JSON file."""
+ from ..index_new import index_chunks_to_sqlite
+ from ..pipeline.chunk import chunk_content
+ from ..pipeline.classify import ClassifiedContent, ContentType, ContentValue
+
+ type_map = {
+ "user_message": (ContentType.USER_MESSAGE, ContentValue.HIGH),
+ "assistant_text": (ContentType.ASSISTANT_TEXT, ContentValue.MEDIUM),
+ "ai_code": (ContentType.AI_CODE, ContentValue.HIGH),
+ }
+
+ all_chunks: List = []
+ project = project_override
+ session_id: Optional[str] = None
+ session_ts: Optional[str] = None
+
+ for entry in parse_gemini_session(file_path):
+ if project is None:
+ project = entry.get("project")
+ if session_id is None:
+ session_id = entry.get("session_id")
+ if session_ts is None:
+ session_ts = entry.get("timestamp")
+
+ content_type, value = type_map.get(entry["content_type"], (ContentType.ASSISTANT_TEXT, ContentValue.MEDIUM))
+ classified = ClassifiedContent(
+ content=entry["content"],
+ content_type=content_type,
+ value=value,
+ metadata={**entry.get("metadata", {}), "source": "gemini"},
+ )
+ chunks = chunk_content(classified)
+ all_chunks.extend(chunks)
+ if verbose:
+ print(f" [{entry['content_type']}] {entry['content'][:80]!r}")
+
+ if not all_chunks:
+ logger.info("No indexable content in %s", file_path)
+ return 0
+
+ if dry_run:
+ print(f"Dry run: {len(all_chunks)} chunks from {file_path.name} (not stored)")
+ return len(all_chunks)
+
+ if db_path is None:
+ from ..paths import DEFAULT_DB_PATH
+
+ db_path = DEFAULT_DB_PATH
+
+ for chunk in all_chunks:
+ chunk.metadata.setdefault("source", "gemini")
+
+ indexed = index_chunks_to_sqlite(
+ all_chunks,
+ source_file=str(file_path),
+ project=project,
+ db_path=db_path,
+ created_at=session_ts,
+ )
+
+ if session_id:
+ try:
+ from ..vector_store import VectorStore
+
+ with VectorStore(db_path) as store:
+ store.store_session_context(
+ session_id=session_id,
+ project=project or "gemini",
+ started_at=session_ts,
+ ended_at=None,
+ )
+ except Exception as exc:
+ logger.debug("Could not store session context for %s: %s", session_id, exc)
+
+ logger.info("Indexed %d chunks from %s (session %s, project %s)", indexed, file_path.name, session_id, project)
+ return indexed
+
+
+def ingest_gemini_dir(
+ sessions_dir: Optional[Path] = None,
+ db_path: Optional[Path] = None,
+ project_override: Optional[str] = None,
+ since_days: Optional[int] = None,
+ dry_run: bool = False,
+ verbose: bool = False,
+) -> tuple[int, int]:
+ """Ingest Gemini session files under ~/.gemini/tmp."""
+ if sessions_dir is None:
+ sessions_dir = Path.home() / ".gemini" / "tmp"
+
+ if not sessions_dir.exists():
+ raise FileNotFoundError(f"Gemini sessions directory not found: {sessions_dir}")
+
+ session_files = sorted(sessions_dir.glob("**/chats/session-*.json"))
+ if since_days is not None:
+ cutoff = datetime.now(timezone.utc).timestamp() - since_days * 86400
+ session_files = [f for f in session_files if f.stat().st_mtime >= cutoff]
+
+ if not session_files:
+ logger.info("No Gemini session files found in %s", sessions_dir)
+ return 0, 0
+
+ if not dry_run and db_path is None:
+ from ..paths import DEFAULT_DB_PATH
+
+ db_path = DEFAULT_DB_PATH
+
+ already_indexed: set[str] = set()
+ if not dry_run and db_path and db_path.exists():
+ try:
+ from ..vector_store import VectorStore
+
+ with VectorStore(db_path) as store:
+ cursor = store._read_cursor()
+ rows = cursor.execute("SELECT DISTINCT source_file FROM chunks WHERE source = 'gemini'")
+ already_indexed = {row[0] for row in rows}
+ except Exception as exc:
+ logger.debug("Could not check existing gemini chunks: %s", exc)
+
+ files_processed = 0
+ total_chunks = 0
+
+ for file_path in session_files:
+ if str(file_path) in already_indexed:
+ logger.debug("Skipping already-indexed %s", file_path.name)
+ continue
+ try:
+ total_chunks += ingest_gemini_session(
+ file_path,
+ db_path=db_path,
+ project_override=project_override,
+ dry_run=dry_run,
+ verbose=verbose,
+ )
+ files_processed += 1
+ except Exception as exc:
+ logger.warning("Failed to ingest %s: %s", file_path.name, exc)
+
+ return files_processed, total_chunks
diff --git a/tests/test_agent_ingest_launchd.py b/tests/test_agent_ingest_launchd.py
new file mode 100644
index 00000000..b959ced8
--- /dev/null
+++ b/tests/test_agent_ingest_launchd.py
@@ -0,0 +1,23 @@
+"""Tests for the multi-agent ingest launchd wiring."""
+
+from pathlib import Path
+import xml.etree.ElementTree as ET
+
+
+def test_agent_ingest_plist_has_correct_label_and_command():
+ plist_path = Path(__file__).parent.parent / "scripts" / "launchd" / "com.brainlayer.agent-ingest.plist"
+ tree = ET.parse(plist_path)
+ root = tree.getroot()
+
+ strings = [elem.text for elem in root.iter("string") if elem.text]
+
+ assert "com.brainlayer.agent-ingest" in strings
+ assert "__BRAINLAYER_BIN__" in strings
+ assert "watch-agents" in strings
+
+
+def test_launchd_installer_mentions_agent_ingest():
+ install_script = (Path(__file__).parent.parent / "scripts" / "launchd" / "install.sh").read_text()
+
+ assert "install_plist agent-ingest" in install_script
+ assert "remove_plist agent-ingest" in install_script
diff --git a/tests/test_agent_session_watcher.py b/tests/test_agent_session_watcher.py
new file mode 100644
index 00000000..ff80ba41
--- /dev/null
+++ b/tests/test_agent_session_watcher.py
@@ -0,0 +1,72 @@
+"""Tests for the multi-agent session watcher."""
+
+import json
+from pathlib import Path
+
+from brainlayer.agent_watch import AgentSessionRegistry, AgentSessionSource, AgentSessionWatcher
+
+
+def _write_jsonl(path: Path, payload: dict) -> None:
+ path.parent.mkdir(parents=True, exist_ok=True)
+ with open(path, "w") as f:
+ f.write(json.dumps(payload) + "\n")
+
+
+def _append_jsonl(path: Path, payload: dict) -> None:
+ with open(path, "a") as f:
+ f.write(json.dumps(payload) + "\n")
+
+
+class TestAgentSessionRegistry:
+ def test_round_trips_file_state(self, tmp_path):
+ registry = AgentSessionRegistry(tmp_path / "registry.json")
+ registry.set("/tmp/session.jsonl", mtime_ns=123, size=456)
+ registry.flush()
+
+ reloaded = AgentSessionRegistry(tmp_path / "registry.json")
+ assert reloaded.get("/tmp/session.jsonl") == {"mtime_ns": 123, "size": 456}
+
+
+class TestAgentSessionWatcher:
+ def test_polls_multiple_sources_and_skips_unchanged_files(self, tmp_path):
+ codex_file = tmp_path / "codex" / "sessions" / "2026" / "04" / "18" / "session-a.jsonl"
+ cursor_file = tmp_path / "cursor" / "projects" / "brainlayer" / "agent-transcripts" / "abc" / "abc.jsonl"
+ gemini_file = tmp_path / "gemini" / "tmp" / "brainlayer" / "chats" / "session-a.json"
+
+ _write_jsonl(codex_file, {"type": "session_meta"})
+ _write_jsonl(cursor_file, {"role": "user", "message": {"content": [{"type": "text", "text": "hello world"}]}})
+ gemini_file.parent.mkdir(parents=True, exist_ok=True)
+ gemini_file.write_text(json.dumps({"sessionId": "gem-1", "messages": []}))
+
+ seen: list[tuple[str, str]] = []
+
+ def _record(source_name: str):
+ def _ingest(path: Path) -> int:
+ seen.append((source_name, str(path)))
+ return 1
+
+ return _ingest
+
+ watcher = AgentSessionWatcher(
+ registry_path=tmp_path / "agent-registry.json",
+ sources=[
+ AgentSessionSource("codex_cli", ["**/*.jsonl"], _record("codex_cli"), root=tmp_path / "codex" / "sessions"),
+ AgentSessionSource(
+ "cursor",
+ ["**/*.jsonl"],
+ _record("cursor"),
+ root=tmp_path / "cursor" / "projects",
+ ),
+ AgentSessionSource("gemini", ["**/session-*.json"], _record("gemini"), root=tmp_path / "gemini" / "tmp"),
+ ],
+ )
+
+ assert watcher.poll_once() == 3
+ assert len(seen) == 3
+
+ assert watcher.poll_once() == 0
+ assert len(seen) == 3
+
+ _append_jsonl(codex_file, {"type": "event_msg", "payload": {"type": "user_message"}})
+ assert watcher.poll_once() == 1
+ assert seen[-1][0] == "codex_cli"
diff --git a/tests/test_ingest_cursor.py b/tests/test_ingest_cursor.py
new file mode 100644
index 00000000..0845944b
--- /dev/null
+++ b/tests/test_ingest_cursor.py
@@ -0,0 +1,92 @@
+"""Tests for the Cursor session ingestion adapter."""
+
+import json
+from pathlib import Path
+
+from brainlayer.ingest.cursor import parse_cursor_session
+
+
+def _write_jsonl(path: Path, lines: list[dict]) -> None:
+ with open(path, "w") as f:
+ for line in lines:
+ f.write(json.dumps(line) + "\n")
+
+
+def _cursor_user(text: str) -> dict:
+ return {
+ "role": "user",
+ "message": {
+ "content": [{"type": "text", "text": text}],
+ },
+ }
+
+
+def _cursor_assistant(text: str) -> dict:
+ return {
+ "role": "assistant",
+ "message": {
+ "content": [{"type": "text", "text": text}],
+ },
+ }
+
+
+class TestParseCursorSession:
+ def test_extracts_user_query_and_assistant_reply(self, tmp_path):
+ session_dir = tmp_path / ".cursor" / "projects" / "brainlayer" / "agent-transcripts" / "abc123"
+ session_dir.mkdir(parents=True)
+ session_file = session_dir / "abc123.jsonl"
+ _write_jsonl(
+ session_file,
+ [
+ _cursor_user("\nWire the missing cursor ingestion adapter.\n"),
+ _cursor_assistant(
+ "I will inspect the parser wiring and add the missing adapter with tests."
+ ),
+ ],
+ )
+
+ entries = list(parse_cursor_session(session_file))
+
+ assert len(entries) == 2
+ assert entries[0]["content_type"] == "user_message"
+ assert entries[0]["content"] == "Wire the missing cursor ingestion adapter."
+ assert entries[0]["source"] == "cursor"
+ assert entries[0]["project"] == "brainlayer"
+ assert entries[0]["session_id"] == "abc123"
+
+ assert entries[1]["content_type"] == "assistant_text"
+ assert entries[1]["source"] == "cursor"
+ assert entries[1]["project"] == "brainlayer"
+
+ def test_classifies_code_block_as_ai_code(self, tmp_path):
+ session_dir = tmp_path / ".cursor" / "projects" / "brainlayer" / "agent-transcripts" / "code-session"
+ session_dir.mkdir(parents=True)
+ session_file = session_dir / "code-session.jsonl"
+ _write_jsonl(
+ session_file,
+ [
+ _cursor_assistant(
+ "Here is the parser update:\n```python\n"
+ "def parse_cursor_session(path):\n return []\n```\n"
+ )
+ ],
+ )
+
+ entries = list(parse_cursor_session(session_file))
+
+ assert len(entries) == 1
+ assert entries[0]["content_type"] == "ai_code"
+
+ def test_skips_empty_and_short_messages(self, tmp_path):
+ session_dir = tmp_path / ".cursor" / "projects" / "brainlayer" / "agent-transcripts" / "skip-session"
+ session_dir.mkdir(parents=True)
+ session_file = session_dir / "skip-session.jsonl"
+ _write_jsonl(
+ session_file,
+ [
+ _cursor_user("ok"),
+ _cursor_assistant("short"),
+ ],
+ )
+
+ assert list(parse_cursor_session(session_file)) == []
diff --git a/tests/test_ingest_gemini.py b/tests/test_ingest_gemini.py
new file mode 100644
index 00000000..72652497
--- /dev/null
+++ b/tests/test_ingest_gemini.py
@@ -0,0 +1,78 @@
+"""Tests for the Gemini session ingestion adapter."""
+
+import json
+from pathlib import Path
+
+from brainlayer.ingest.gemini import parse_gemini_session
+
+
+def _write_json(path: Path, payload: dict) -> None:
+ with open(path, "w") as f:
+ json.dump(payload, f)
+
+
+class TestParseGeminiSession:
+ def test_extracts_user_and_gemini_messages(self, tmp_path):
+ session_dir = tmp_path / ".gemini" / "tmp" / "brainlayer" / "chats"
+ session_dir.mkdir(parents=True)
+ session_file = session_dir / "session-2026-04-18T02-02-test.json"
+ _write_json(
+ session_file,
+ {
+ "sessionId": "gem-session-1",
+ "startTime": "2026-04-18T02:02:10.184Z",
+ "messages": [
+ {"id": "info-1", "timestamp": "2026-04-18T02:02:10.300Z", "type": "info", "content": "skip me"},
+ {
+ "id": "user-1",
+ "timestamp": "2026-04-18T02:02:11.000Z",
+ "type": "user",
+ "content": [{"text": "Backfill the missing Gemini sessions into BrainLayer."}],
+ },
+ {
+ "id": "assistant-1",
+ "timestamp": "2026-04-18T02:02:12.000Z",
+ "type": "gemini",
+ "content": "I will inspect the session format and backfill in batches.",
+ },
+ ],
+ },
+ )
+
+ entries = list(parse_gemini_session(session_file))
+
+ assert len(entries) == 2
+ assert entries[0]["content_type"] == "user_message"
+ assert entries[0]["source"] == "gemini"
+ assert entries[0]["project"] == "brainlayer"
+ assert entries[0]["session_id"] == "gem-session-1"
+ assert entries[0]["timestamp"] == "2026-04-18T02:02:11.000Z"
+
+ assert entries[1]["content_type"] == "assistant_text"
+ assert entries[1]["source"] == "gemini"
+ assert entries[1]["project"] == "brainlayer"
+
+ def test_classifies_code_block_as_ai_code(self, tmp_path):
+ session_dir = tmp_path / ".gemini" / "tmp" / "brainlayer" / "chats"
+ session_dir.mkdir(parents=True)
+ session_file = session_dir / "session-code.json"
+ _write_json(
+ session_file,
+ {
+ "sessionId": "gem-session-2",
+ "startTime": "2026-04-18T02:02:10.184Z",
+ "messages": [
+ {
+ "id": "assistant-1",
+ "timestamp": "2026-04-18T02:02:12.000Z",
+ "type": "gemini",
+ "content": "```python\nprint('hello')\n```",
+ }
+ ],
+ },
+ )
+
+ entries = list(parse_gemini_session(session_file))
+
+ assert len(entries) == 1
+ assert entries[0]["content_type"] == "ai_code"