diff --git a/.gitignore b/.gitignore index 05308ee..f472475 100644 --- a/.gitignore +++ b/.gitignore @@ -25,6 +25,7 @@ htmlcov/ *.db *.db-wal *.db-shm +data/ # Secrets .env diff --git a/scripts/diarize_episodes.py b/scripts/diarize_episodes.py new file mode 100644 index 0000000..7f2db45 --- /dev/null +++ b/scripts/diarize_episodes.py @@ -0,0 +1,408 @@ +#!/usr/bin/env python3 +"""Diarize guest episodes and re-index into BrainLayer. + +Pipeline per episode: + 1. Download audio via yt-dlp (wav format) + 2. Transcribe + align + diarize via WhisperX + 3. Map SPEAKER_00/01 → "Huberman"/"Guest" (by speaking time) + 4. Save diarized JSON + 5. Re-index via index_youtube.py --diarized-transcript --replace + +Usage: + # All guest episodes + python3 diarize_episodes.py + + # Single episode + python3 diarize_episodes.py --video-id zEYE-vcVKy8 + + # Dry run (download + diarize only, don't re-index) + python3 diarize_episodes.py --dry-run + + # Skip download (reuse existing audio) + python3 diarize_episodes.py --skip-download + +Prerequisites: + - WhisperX installed: pip install git+https://github.com/m-bain/whisperx.git + - HuggingFace token at ~/.huggingface/token + - pyannote model terms accepted at huggingface.co/pyannote/speaker-diarization-3.1 +""" + +import argparse +import json +import logging +import os +import subprocess +import sys +import tempfile +from pathlib import Path + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s [%(levelname)s] %(message)s", + datefmt="%H:%M:%S", +) +log = logging.getLogger(__name__) + +# Guest episodes that need diarization +EPISODES = [ + {"video_id": "zEYE-vcVKy8", "title": "Galpin: Assess & Improve Fitness", "guest": "Galpin"}, + {"video_id": "CyDLbrZK75U", "title": "Galpin: Build Strength & Muscles", "guest": "Galpin"}, + {"video_id": "oNkDA2F7CjM", "title": "Galpin: Endurance & Fat Loss", "guest": "Galpin"}, + {"video_id": "UIy-WQCZd4M", "title": "Galpin: Training Program Design", "guest": "Galpin"}, + {"video_id": "juD99_sPWGU", "title": "Galpin: Recovery", "guest": "Galpin"}, + {"video_id": "q37ARYnRDGc", "title": "Galpin: Nutrition & Supplements", "guest": "Galpin"}, + {"video_id": "IAnhFUUCq6c", "title": "Galpin original #65", "guest": "Galpin"}, + {"video_id": "x3MgDtZovks", "title": "Søberg Cold/Heat", "guest": "Søberg"}, +] + +AUDIO_DIR = Path(__file__).parent.parent / "data" / "audio" +DIARIZED_DIR = Path(__file__).parent.parent / "data" / "diarized" + + +def get_hf_token() -> str: + """Read HuggingFace token from standard location.""" + token_path = Path.home() / ".huggingface" / "token" + if token_path.exists(): + return token_path.read_text().strip() + token = os.environ.get("HF_TOKEN", "") + if token: + return token + raise RuntimeError( + "No HuggingFace token found. Create one at https://huggingface.co/settings/tokens " + "and save to ~/.huggingface/token" + ) + + +def download_audio(video_id: str, output_dir: Path) -> Path: + """Download audio from YouTube as WAV.""" + output_dir.mkdir(parents=True, exist_ok=True) + output_path = output_dir / f"{video_id}.wav" + + if output_path.exists(): + log.info(f" Audio already exists: {output_path}") + return output_path + + log.info(f" Downloading audio for {video_id}...") + cmd = [ + "yt-dlp", + "-x", "--audio-format", "wav", + "--output", str(output_path), + "--cookies-from-browser", "brave", + "--remote-components", "ejs:github", + f"https://www.youtube.com/watch?v={video_id}", + ] + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + log.error(f" yt-dlp failed: {result.stderr}") + raise RuntimeError(f"Failed to download audio for {video_id}") + + # yt-dlp might add extra extension + if not output_path.exists(): + # Check for .wav.wav or similar + for f in output_dir.glob(f"{video_id}*"): + if f.suffix in (".wav", ".mp3", ".m4a"): + output_path = f + break + + log.info(f" Audio saved: {output_path}") + return output_path + + +def diarize_audio(audio_path: Path, hf_token: str) -> list[dict]: + """Run WhisperX transcribe + align + diarize on audio file. + + Caches aligned transcript to avoid re-transcribing on diarization failures. + Returns list of segments with speaker labels: + [{"start": 0.5, "end": 2.3, "speaker": "SPEAKER_00", "text": "..."}] + """ + try: + import whisperx + except ImportError: + raise RuntimeError( + "WhisperX not installed. Run: pip install git+https://github.com/m-bain/whisperx.git" + ) + + import torch + + # CTranslate2 (faster-whisper) doesn't support MPS — use CPU for transcription + # pyannote diarization also needs CPU (MPS support is partial) + if torch.cuda.is_available(): + device = "cuda" + compute_type = "float16" + else: + device = "cpu" + compute_type = "int8" + + log.info(f" Device: {device}, compute: {compute_type}") + + # Check for cached aligned transcript (avoids re-transcribing on diarization failures) + # audio_path is data/audio/VIDEO.wav → parent.parent = data/ → data/aligned/ + cache_dir = audio_path.parent.parent / "aligned" + cache_path = cache_dir / f"{audio_path.stem}_aligned.json" + + audio = whisperx.load_audio(str(audio_path)) + + if cache_path.exists(): + log.info(f" Using cached aligned transcript: {cache_path}") + with open(cache_path) as f: + result = json.load(f) + else: + # 1. Transcribe + log.info(" Transcribing (this takes a while on CPU)...") + model = whisperx.load_model("large-v3", device, compute_type=compute_type) + result = model.transcribe(audio, batch_size=16 if device == "cuda" else 4) + del model + + # 2. Align + log.info(" Aligning...") + model_a, metadata = whisperx.load_align_model( + language_code=result["language"], device=device + ) + result = whisperx.align( + result["segments"], model_a, metadata, audio, device, + return_char_alignments=False, + ) + del model_a + + # Cache aligned result + cache_dir.mkdir(parents=True, exist_ok=True) + with open(cache_path, "w") as f: + json.dump(result, f) + log.info(f" Cached aligned transcript: {cache_path}") + + # 3. Diarize + log.info(" Diarizing...") + from whisperx.diarize import DiarizationPipeline + diarize_model = DiarizationPipeline( + token=hf_token, device="cpu" + ) + diarize_segments = diarize_model(audio) + result = whisperx.assign_word_speakers(diarize_segments, result) + + del diarize_model + if device == "cuda": + torch.cuda.empty_cache() + + return result["segments"] + + +def _detect_huberman_speaker(segments: list[dict]) -> str | None: + """Content-based detection: find which speaker ID says Huberman's intro. + + Scans early segments for self-identification phrases like + "I'm Andrew Huberman" or "Huberman Lab". Returns the speaker ID + that says these phrases, or None if not found. + """ + # Only check first 2 minutes of content + intro_phrases = [ + "i'm andrew huberman", + "my name is andrew huberman", + "i am andrew huberman", + "huberman lab", + "welcome to the huberman lab", + ] + for seg in segments: + if seg.get("start", 0) > 120: # Only check first 2 min + break + text = seg.get("text", "").lower() + for phrase in intro_phrases: + if phrase in text: + speaker = seg.get("speaker") + if speaker: + return speaker + return None + + +def map_speakers(segments: list[dict], guest_name: str) -> list[dict]: + """Map SPEAKER_00/01 to Huberman/Guest. + + Uses two strategies: + 1. Content-based: scan intro for "I'm Andrew Huberman" (most reliable) + 2. Speaking time fallback: Huberman talks more in his episodes + """ + speaker_time: dict[str, float] = {} + for seg in segments: + speaker = seg.get("speaker", "UNKNOWN") + duration = seg.get("end", 0) - seg.get("start", 0) + speaker_time[speaker] = speaker_time.get(speaker, 0) + duration + + if not speaker_time: + log.warning(" No speakers found in segments") + return segments + + sorted_speakers = sorted(speaker_time.items(), key=lambda x: x[1], reverse=True) + + # Strategy 1: Content-based detection (preferred) + huberman_id = _detect_huberman_speaker(segments) + if huberman_id: + log.info(f" Content-based detection: Huberman = {huberman_id}") + else: + # Strategy 2: Most talkative = Huberman + huberman_id = sorted_speakers[0][0] + log.info(f" Fallback to speaking-time: Huberman = {huberman_id}") + + speaker_map = {huberman_id: "Huberman"} + for spk, _ in sorted_speakers: + if spk == huberman_id: + continue + if spk not in speaker_map: + if guest_name not in speaker_map.values(): + speaker_map[spk] = guest_name + else: + idx = len(speaker_map) + speaker_map[spk] = f"Speaker_{idx}" + + log.info(f" Speaker mapping: {speaker_map}") + log.info(f" Speaking time: {dict((speaker_map.get(k, k), f'{v:.0f}s') for k, v in speaker_time.items())}") + + # Apply mapping + for seg in segments: + old_speaker = seg.get("speaker", "UNKNOWN") + seg["speaker"] = speaker_map.get(old_speaker, old_speaker) + + return segments + + +def reindex_episode(video_id: str, diarized_path: Path) -> int: + """Re-index episode using index_youtube.py with diarized transcript.""" + script_path = Path(__file__).parent / "index_youtube.py" + + cmd = [ + sys.executable, + str(script_path), + f"https://www.youtube.com/watch?v={video_id}", + "--diarized-transcript", str(diarized_path), + "--replace", + ] + + log.info(f" Re-indexing with: {' '.join(cmd[-4:])}") + result = subprocess.run(cmd, capture_output=True, text=True) + if result.returncode != 0: + log.error(f" Re-index failed: {result.stderr}") + return 0 + + # Parse chunk count from output + for line in result.stdout.split("\n"): + if "Indexed" in line and "chunks" in line: + log.info(f" {line.strip()}") + + return 1 + + +def process_episode( + episode: dict, + hf_token: str, + dry_run: bool = False, + skip_download: bool = False, +) -> bool: + """Process a single episode: download → diarize → map speakers → save → reindex.""" + video_id = episode["video_id"] + guest = episode["guest"] + title = episode["title"] + + log.info(f"\n{'='*60}") + log.info(f"Processing: {title} ({video_id})") + log.info(f"Guest: {guest}") + + # 1. Download audio + if skip_download: + audio_path = AUDIO_DIR / f"{video_id}.wav" + if not audio_path.exists(): + log.error(f" --skip-download but no audio at {audio_path}") + return False + else: + try: + audio_path = download_audio(video_id, AUDIO_DIR) + except Exception as e: + log.error(f" Download failed: {e}") + return False + + # 2. Diarize + diarized_path = DIARIZED_DIR / f"{video_id}.json" + if diarized_path.exists(): + log.info(f" Diarized transcript already exists: {diarized_path}") + with open(diarized_path) as f: + segments = json.load(f) + else: + try: + segments = diarize_audio(audio_path, hf_token) + except Exception as e: + log.error(f" Diarization failed: {e}") + return False + + # 3. Map speakers + segments = map_speakers(segments, guest) + + # 4. Save diarized JSON + DIARIZED_DIR.mkdir(parents=True, exist_ok=True) + with open(diarized_path, "w") as f: + json.dump(segments, f, indent=2) + log.info(f" Saved diarized transcript: {diarized_path} ({len(segments)} segments)") + + if dry_run: + log.info(" [DRY RUN] Skipping re-index") + # Show sample + for seg in segments[:5]: + speaker = seg.get("speaker", "?") + text = seg.get("text", "")[:80] + log.info(f" {speaker}: {text}") + return True + + # 5. Re-index + try: + reindex_episode(video_id, diarized_path) + except Exception as e: + log.error(f" Re-index failed: {e}") + return False + + return True + + +def main(): + parser = argparse.ArgumentParser( + description="Diarize Huberman guest episodes and re-index in BrainLayer" + ) + parser.add_argument("--video-id", help="Process single episode by video ID") + parser.add_argument("--dry-run", action="store_true", help="Download + diarize only, skip re-index") + parser.add_argument("--skip-download", action="store_true", help="Skip audio download (reuse existing)") + parser.add_argument("--batch-size", type=int, default=3, + help="Max episodes per batch (default 3, for thermal safety)") + args = parser.parse_args() + + # Get HF token + try: + hf_token = get_hf_token() + log.info(f"HuggingFace token found ({len(hf_token)} chars)") + except RuntimeError as e: + log.error(str(e)) + sys.exit(1) + + # Select episodes + if args.video_id: + episodes = [e for e in EPISODES if e["video_id"] == args.video_id] + if not episodes: + log.error(f"Video ID {args.video_id} not in episode list") + sys.exit(1) + else: + episodes = EPISODES[:args.batch_size] + log.info(f"Processing batch of {len(episodes)} / {len(EPISODES)} episodes") + + # Process + succeeded = 0 + failed = 0 + for ep in episodes: + ok = process_episode(ep, hf_token, args.dry_run, args.skip_download) + if ok: + succeeded += 1 + else: + failed += 1 + + log.info(f"\n{'='*60}") + log.info(f"Done! Succeeded: {succeeded}, Failed: {failed}") + if len(EPISODES) > len(episodes): + remaining = len(EPISODES) - len(episodes) + log.info(f"Remaining episodes: {remaining}. Run again for next batch.") + + +if __name__ == "__main__": + main() diff --git a/scripts/index_youtube.py b/scripts/index_youtube.py index 696ea8e..82d061d 100644 --- a/scripts/index_youtube.py +++ b/scripts/index_youtube.py @@ -608,6 +608,55 @@ def index_single_video( # Main # --------------------------------------------------------------------------- +def load_diarized_transcript(path: Path) -> list[dict]: + """Load a WhisperX diarized JSON and convert to indexing segments. + + Input format: [{"start": 0.5, "end": 2.3, "speaker": "Huberman", "text": "..."}] + Output format: [{"text": "HUBERMAN: ...", "start": 0.5, "duration": 1.8}] + """ + with open(path) as f: + data = json.load(f) + # Handle both list-of-segments and {"segments": [...]} formats + segments = data if isinstance(data, list) else data.get("segments", []) + result = [] + for seg in segments: + text = seg.get("text", "").strip() + if not text: + continue + speaker = seg.get("speaker", "").upper() + if speaker: + text = f"{speaker}: {text}" + start = seg.get("start", 0) + end = seg.get("end", start) + result.append({ + "text": text, + "start": start, + "duration": end - start, + }) + return result + + +def delete_video_chunks(store: VectorStore, video_id: str) -> int: + """Delete all chunks for a video ID. Returns count deleted.""" + source_file = f"youtube:{video_id}" + cursor = store.conn.cursor() + # Count first + count = list(cursor.execute( + "SELECT COUNT(*) FROM chunks WHERE source_file = ?", (source_file,) + ))[0][0] + if count == 0: + return 0 + # Delete from vectors first (FK dependency) + cursor.execute( + "DELETE FROM chunk_vectors WHERE chunk_id IN " + "(SELECT id FROM chunks WHERE source_file = ?)", + (source_file,), + ) + cursor.execute("DELETE FROM chunks WHERE source_file = ?", (source_file,)) + log.info(f" Deleted {count} existing chunks for {video_id}") + return count + + def main(): parser = argparse.ArgumentParser( description="Index YouTube transcripts into BrainLayer" @@ -623,6 +672,10 @@ def main(): help="Delay between videos (seconds)") parser.add_argument("--limit", type=int, default=0, help="Max videos to process (0=all)") + parser.add_argument("--diarized-transcript", type=Path, + help="Path to WhisperX diarized JSON (single video only)") + parser.add_argument("--replace", action="store_true", + help="Delete existing chunks before re-indexing") args = parser.parse_args() if not args.url and not args.channel and not args.playlist: @@ -641,8 +694,69 @@ def main(): if args.url: # Single video - total_chunks = index_single_video(args.url, store, args.project, args.dry_run) - total_videos = 1 if total_chunks > 0 else 0 + video_id_match = re.search(r"[?&]v=([a-zA-Z0-9_-]+)", args.url) + vid = video_id_match.group(1) if video_id_match else args.url + + if args.replace: + delete_video_chunks(store, vid) + + if args.diarized_transcript: + # Use pre-diarized transcript instead of fetching from YouTube + log.info(f"Using diarized transcript: {args.diarized_transcript}") + segments = load_diarized_transcript(args.diarized_transcript) + log.info(f" Loaded {len(segments)} diarized segments") + + # Still need video metadata from YouTube + info = extract_video_info(args.url) + title = info.get("title", "Unknown") if info else "Unknown" + channel = info.get("uploader", info.get("channel", "Unknown")) if info else "Unknown" + chapters = (info.get("chapters") or []) if info else [] + upload_date = info.get("upload_date", "") if info else "" + + chunks = chunk_transcript(segments, vid, title, channel, chapters, args.project) + log.info(f" Chunks: {len(chunks)}") + + if args.dry_run: + log.info(f" [DRY RUN] Would index {len(chunks)} chunks") + for i, c in enumerate(chunks[:3]): + log.info(f" Chunk {i}: {c.content[:80]}...") + total_chunks = len(chunks) + else: + log.info(f" Embedding {len(chunks)} chunks...") + embedded = embed_chunks(chunks) + source_file = f"youtube:{vid}" + chunk_data = [] + embeddings = [] + for i, ec in enumerate(embedded): + c = ec.chunk + meta = dict(c.metadata) + if upload_date: + meta["upload_date"] = upload_date + created_at = None + if upload_date and len(upload_date) == 8: + created_at = f"{upload_date[:4]}-{upload_date[4:6]}-{upload_date[6:8]}T00:00:00+00:00" + chunk_data.append({ + "id": f"{source_file}:{i}", + "content": c.content, + "metadata": meta, + "source_file": source_file, + "project": args.project, + "content_type": c.content_type.value, + "value_type": c.value.value, + "char_count": c.char_count, + "source": "youtube", + "conversation_id": source_file, + "position": i, + "created_at": created_at, + }) + embeddings.append(ec.embedding) + total_chunks = store.upsert_chunks(chunk_data, embeddings) + log.info(f" Indexed {total_chunks} chunks for '{title}'") + + total_videos = 1 if total_chunks > 0 else 0 + else: + total_chunks = index_single_video(args.url, store, args.project, args.dry_run) + total_videos = 1 if total_chunks > 0 else 0 else: # Channel or playlist if args.channel: diff --git a/src/brainlayer/mcp/__init__.py b/src/brainlayer/mcp/__init__.py index 629c0d0..43226f6 100644 --- a/src/brainlayer/mcp/__init__.py +++ b/src/brainlayer/mcp/__init__.py @@ -47,7 +47,7 @@ async def _with_timeout(coro, timeout: float = MCP_QUERY_TIMEOUT): server = Server( "brainlayer", instructions=( - "Memory layer for Claude Code. 6 tools:\n" + "Memory layer for Claude Code. 7 tools:\n" "- brain_search(query): semantic search across 268K+ indexed conversation chunks. " "Filters: project, file_path, chunk_id, content_type, tag, intent, importance_min. " "Routing is automatic — pass file_path for file history, chunk_id to expand context, no args for current work.\n" @@ -60,6 +60,8 @@ async def _with_timeout(coro, timeout: float = MCP_QUERY_TIMEOUT): "Creates a new searchable chunk with source='digest'.\n" "- brain_entity(query): look up a known entity in the knowledge graph. " "Returns entity type, relations, and evidence chunks.\n" + "- brain_update(action, chunk_id): update, archive, or merge existing memories. " + "action=update (change content/tags/importance), archive (soft-delete), merge (keep one, archive duplicates).\n" "Use brain_search at the start of tasks to retrieve past decisions and patterns.\n" "Use brain_store when you make a decision, hit a bug, learn something, or finish a phase.\n" 'project scoping: auto-inferred from cwd. Override with project="all" for cross-project search.\n' @@ -867,6 +869,56 @@ async def list_tools() -> list[Tool]: "required": ["query"], }, ), + Tool( + name="brain_update", + title="Update or Archive Memory", + description="""Update, archive, or merge existing memories in BrainLayer. + +Actions: +- **update**: Change content, tags, or importance of an existing memory. If content changes, re-embeds automatically. +- **archive**: Soft-delete a memory (removes from search results, keeps in DB). +- **merge**: Combine multiple duplicate memories into one. Keeps the first chunk_id, archives the rest. + +Use brain_search first to find the chunk_id(s) you want to modify. + +Returns: Structured JSON with action taken and affected chunk IDs.""", + annotations=_WRITE, + inputSchema={ + "type": "object", + "properties": { + "action": { + "type": "string", + "enum": ["update", "archive", "merge"], + "description": "What to do: update fields, archive (soft-delete), or merge duplicates", + }, + "chunk_id": { + "type": "string", + "description": "The chunk ID to update or archive. For merge, this is the chunk to keep.", + }, + "content": { + "type": "string", + "description": "New content (update only). Will be re-embedded.", + }, + "tags": { + "type": "array", + "items": {"type": "string"}, + "description": "New tags (update only). Replaces existing tags.", + }, + "importance": { + "type": "integer", + "minimum": 1, + "maximum": 10, + "description": "New importance score (update only).", + }, + "merge_chunk_ids": { + "type": "array", + "items": {"type": "string"}, + "description": "For merge: additional chunk IDs to archive (duplicates of chunk_id).", + }, + }, + "required": ["action", "chunk_id"], + }, + ), ] @@ -1012,6 +1064,16 @@ async def call_tool(name: str, arguments: dict[str, Any]): ) ) + elif name == "brain_update": + return await _brain_update( + action=arguments["action"], + chunk_id=arguments["chunk_id"], + content=arguments.get("content"), + tags=arguments.get("tags"), + importance=arguments.get("importance"), + merge_chunk_ids=arguments.get("merge_chunk_ids"), + ) + # --- Backward-compat aliases (old tool names route to same handlers) --- elif name == "brainlayer_search": @@ -1542,6 +1604,95 @@ async def _store_new( ) +async def _brain_update( + action: str, + chunk_id: str, + content: str | None = None, + tags: list[str] | None = None, + importance: int | None = None, + merge_chunk_ids: list[str] | None = None, +): + """Update, archive, or merge memories.""" + try: + store = _get_vector_store() + + if action == "archive": + ok = store.archive_chunk(chunk_id) + if not ok: + return _error_result(f"Chunk not found: {chunk_id}") + return [ + TextContent( + type="text", + text=json.dumps({"action": "archived", "chunk_id": chunk_id}), + ) + ] + + elif action == "update": + # Verify chunk exists + existing = store.get_chunk(chunk_id) + if not existing: + return _error_result(f"Chunk not found: {chunk_id}") + + # Re-embed if content changed + embedding = None + if content is not None: + loop = asyncio.get_running_loop() + model = _get_embedding_model() + embedding = await loop.run_in_executor(None, model.embed_query, content) + + ok = store.update_chunk( + chunk_id=chunk_id, + content=content, + tags=tags, + importance=float(importance) if importance is not None else None, + embedding=embedding, + ) + if not ok: + return _error_result(f"Update failed for: {chunk_id}") + + result = {"action": "updated", "chunk_id": chunk_id, "fields": []} + if content is not None: + result["fields"].append("content") + if tags is not None: + result["fields"].append("tags") + if importance is not None: + result["fields"].append("importance") + return [TextContent(type="text", text=json.dumps(result))] + + elif action == "merge": + if not merge_chunk_ids: + return _error_result("merge requires merge_chunk_ids (the duplicates to archive)") + + # Verify the keeper exists + keeper = store.get_chunk(chunk_id) + if not keeper: + return _error_result(f"Keeper chunk not found: {chunk_id}") + + archived = [] + failed = [] + for dup_id in merge_chunk_ids: + ok = store.archive_chunk(dup_id) + if ok: + archived.append(dup_id) + else: + failed.append(dup_id) + + result = { + "action": "merged", + "kept": chunk_id, + "archived": archived, + "failed": failed, + } + return [TextContent(type="text", text=json.dumps(result))] + + else: + return _error_result(f"Unknown action: {action}. Use update, archive, or merge.") + + except Exception as e: + logger.error("brain_update failed: %s", e) + return _error_result(f"brain_update error: {e}") + + # --- Original Handler Functions --- diff --git a/src/brainlayer/vector_store.py b/src/brainlayer/vector_store.py index 396c1e2..6d748fc 100644 --- a/src/brainlayer/vector_store.py +++ b/src/brainlayer/vector_store.py @@ -548,6 +548,84 @@ def upsert_chunks(self, chunks: List[Dict[str, Any]], embeddings: List[List[floa return len(chunks) + def update_chunk( + self, + chunk_id: str, + content: Optional[str] = None, + tags: Optional[List[str]] = None, + importance: Optional[float] = None, + embedding: Optional[List[float]] = None, + ) -> bool: + """Update fields on an existing chunk. Returns True if chunk was found.""" + cursor = self.conn.cursor() + # Check chunk exists + rows = list(cursor.execute("SELECT id FROM chunks WHERE id = ?", (chunk_id,))) + if not rows: + return False + + now = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S.%fZ") + if content is not None: + cursor.execute( + "UPDATE chunks SET content = ?, char_count = ?, summary = ? WHERE id = ?", + (content, len(content), content[:200], chunk_id), + ) + if tags is not None: + cursor.execute( + "UPDATE chunks SET tags = ? WHERE id = ?", + (json.dumps(tags), chunk_id), + ) + if importance is not None: + cursor.execute( + "UPDATE chunks SET importance = ? WHERE id = ?", + (float(max(1, min(10, importance))), chunk_id), + ) + if embedding is not None: + cursor.execute("DELETE FROM chunk_vectors WHERE chunk_id = ?", (chunk_id,)) + cursor.execute( + "INSERT INTO chunk_vectors (chunk_id, embedding) VALUES (?, ?)", + (chunk_id, serialize_f32(embedding)), + ) + return True + + def archive_chunk(self, chunk_id: str) -> bool: + """Soft-delete a chunk by setting value_type to ARCHIVED.""" + cursor = self.conn.cursor() + rows = list(cursor.execute("SELECT id FROM chunks WHERE id = ?", (chunk_id,))) + if not rows: + return False + cursor.execute("UPDATE chunks SET value_type = 'ARCHIVED' WHERE id = ?", (chunk_id,)) + # Remove from vector index so it doesn't appear in searches + cursor.execute("DELETE FROM chunk_vectors WHERE chunk_id = ?", (chunk_id,)) + return True + + def get_chunk(self, chunk_id: str) -> Optional[Dict[str, Any]]: + """Get a single chunk by ID.""" + cursor = self.conn.cursor() + rows = list( + cursor.execute( + """SELECT id, content, metadata, source_file, project, content_type, + value_type, tags, importance, created_at, summary + FROM chunks WHERE id = ?""", + (chunk_id,), + ) + ) + if not rows: + return None + r = rows[0] + return { + "id": r[0], + "content": r[1], + "metadata": r[2], + "source_file": r[3], + "project": r[4], + "content_type": r[5], + "value_type": r[6], + "tags": r[7], + "importance": r[8], + "created_at": r[9], + "summary": r[10], + } + def search( self, query_embedding: Optional[List[float]] = None, diff --git a/tests/test_think_recall_integration.py b/tests/test_think_recall_integration.py index d6b39f6..b499c96 100644 --- a/tests/test_think_recall_integration.py +++ b/tests/test_think_recall_integration.py @@ -243,16 +243,16 @@ def test_recall_file_with_embedding(self, store, embed_fn): class TestMCPToolCount: - """Verify MCP server has 5 tools (Phase 4 + Phase 3).""" + """Verify MCP server has correct tool count.""" def test_tool_count(self): - """MCP server should have 6 tools: search, store, recall, digest, entity, get_person.""" + """MCP server should have 7 tools: search, store, recall, digest, entity, get_person, update.""" import asyncio from brainlayer.mcp import list_tools tools = asyncio.run(list_tools()) - assert len(tools) == 6 + assert len(tools) == 7 def test_consolidated_tools_registered(self): """brain_search, brain_store, brain_recall are registered."""