# CouncilScribe
## Automated City Council Meeting Transcription

This notebook processes a city council meeting recording through a 6-stage pipeline:
1. **Ingest** — Normalize audio to 16kHz mono WAV
2. **Diarize** — Identify who spoke when (pyannote.audio)
3. **Transcribe** — Speech-to-text with word timestamps (faster-whisper)
4. **Identify** — Map speaker labels to real names
5. **Enroll** — Save voice profiles for future meetings
6. **Export** — Output Markdown, JSON, and SRT files

Each stage checkpoints to Google Drive so you can resume after a session timeout.

---
## 1. Setup

In [None]:
# Install dependencies (pin numpy to avoid Colab compatibility issues)
!pip install -q "numpy<2.1" && pip install -q faster-whisper pyannote.audio noisereduce soundfile pydub huggingface_hub llama-cpp-python scipy requests beautifulsoup4

In [None]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')

In [None]:
# Clone or update CouncilScribe source
import os
import sys

REPO_URL = "https://github.com/chrisandrewsedu/CouncilScribe.git"  # Update with your repo URL
REPO_DIR = "/content/CouncilScribe"

if os.path.exists(REPO_DIR):
    !cd {REPO_DIR} && git pull
else:
    !git clone {REPO_URL} {REPO_DIR}

# Add src to Python path
if REPO_DIR not in sys.path:
    sys.path.insert(0, REPO_DIR)

print("CouncilScribe source loaded.")

In [None]:
# Authenticate Hugging Face (required for pyannote models)
from huggingface_hub import notebook_login
notebook_login()

In [None]:
# Check GPU availability
import torch
if torch.cuda.is_available():
    gpu_name = torch.cuda.get_device_name(0)
    vram = torch.cuda.get_device_properties(0).total_memory / 1e9
    print(f"GPU: {gpu_name} ({vram:.1f} GB VRAM)")
else:
    print("No GPU available. Will use CPU mode (slower, smaller model).")

---
## 2. Configuration

In [None]:
# @title Meeting Configuration { display-mode: "form" }
# @markdown Fill in the meeting details below:

meeting_id = "2026-02-10-regular"  # @param {type:"string"}
city = "Bloomington"  # @param {type:"string"}
meeting_date = "2026-02-10"  # @param {type:"date"}
meeting_type = "Regular Session"  # @param ["Regular Session", "Special Session", "Work Session", "Committee Meeting"]

# @markdown ---
# @markdown ### Audio Source
# @markdown Choose how to provide the meeting recording:
audio_source_type = "URL"  # @param ["Local/Drive File", "URL", "CATS TV Browser"]

# @markdown **If Local/Drive File:** path to the audio/video file on Drive
audio_file_path = "/content/drive/MyDrive/CouncilScribe/meetings/input.mp4"  # @param {type:"string"}

# @markdown **If URL:** direct video URL or CATS TV page URL
audio_url = "https://catstv.net/government.php?issearch=govt&meeterid=117"  # @param {type:"string"}

# @markdown **If CATS TV Browser:** use the next cell to browse and select a meeting
# @markdown ---

num_speakers_hint = 0  # @param {type:"integer"}
apply_noise_reduction = False  # @param {type:"boolean"}
use_llm_identification = True  # @param {type:"boolean"}

# Set num_speakers to None if 0 (let pyannote auto-detect)
num_speakers = num_speakers_hint if num_speakers_hint > 0 else None

# Resolve audio_path based on source type
if audio_source_type == "URL":
    audio_path = audio_url
    print(f"Audio source: URL")
    print(f"  {audio_url}")
elif audio_source_type == "CATS TV Browser":
    audio_path = None  # Will be set by the browser cell below
    print("Audio source: CATS TV Browser (run the next cell to select a meeting)")
else:
    audio_path = audio_file_path
    print(f"Audio source: Local file")
    print(f"  {audio_file_path}")

print(f"\nMeeting: {city} {meeting_type} ({meeting_date})")
print(f"Speaker hint: {num_speakers or 'auto-detect'}")

In [None]:
# @title CATS TV Meeting Browser { display-mode: "form" }
# @markdown Browse and select a meeting from the CATS TV archive.
# @markdown Only needed if **Audio Source** is set to "CATS TV Browser" above.
# @markdown
# @markdown Adjust the search URL or limit as needed:

catstv_search_url = "https://catstv.net/government.php?issearch=govt"  # @param {type:"string"}
results_limit = 25  # @param {type:"integer"}

from src.download import fetch_catstv_meetings, display_catstv_meetings

print("Fetching CATS TV meeting archive...")
catstv_meetings = fetch_catstv_meetings(catstv_search_url)
print(f"Found {len(catstv_meetings)} meetings.\n")
display_catstv_meetings(catstv_meetings, limit=results_limit)

In [None]:
# @title Select a CATS TV Meeting { display-mode: "form" }
# @markdown Enter the number from the list above to select a meeting:

meeting_number = 0  # @param {type:"integer"}

# Guard against running this cell before the config/browser cells
if "catstv_meetings" not in dir() or not catstv_meetings:
    print("No meetings loaded. Run the 'CATS TV Meeting Browser' cell first.")
elif 0 <= meeting_number < len(catstv_meetings):
    selected = catstv_meetings[meeting_number]
    audio_path = selected["video_url"]

    # Auto-fill meeting metadata from CATS TV data
    if selected["date"]:
        meeting_date = selected["date"]
    if selected["name"]:
        meeting_type = selected["name"]
        if selected["subtitle"]:
            meeting_type += f" — {selected['subtitle']}"

    print(f"Selected: {selected['name']}")
    if selected["subtitle"]:
        print(f"  {selected['subtitle']}")
    print(f"  Date: {selected['date']}")
    print(f"  Duration: {selected['duration']}")
    print(f"  Video URL: {audio_path}")
    if selected["documents_url"]:
        print(f"  Documents: {selected['documents_url']}")
else:
    print(f"Invalid selection: {meeting_number}. Must be 0-{len(catstv_meetings)-1}.")

In [None]:
# Initialize pipeline state and directory structure
from src.checkpoint import PipelineState, PipelineStage, ensure_drive_structure
from src.models import Meeting, ProcessingMetadata

# Defaults for variables that should have been set by config/browser cells
if "meeting_id" not in dir() or not meeting_id:
    meeting_id = "unnamed-meeting"
    print("Warning: meeting_id not set. Run the Configuration cell first, or using default.")
if "city" not in dir():
    city = "Unknown"
if "meeting_date" not in dir():
    meeting_date = ""
if "meeting_type" not in dir():
    meeting_type = "Regular Session"
if "audio_path" not in dir() or not audio_path:
    raise RuntimeError(
        "audio_path is not set. Run the Configuration cell and choose an audio source, "
        "or use the CATS TV Browser to select a meeting first."
    )

meeting_dir = ensure_drive_structure(meeting_id)
state = PipelineState(meeting_dir)

meeting = Meeting(
    meeting_id=meeting_id,
    city=city,
    date=meeting_date,
    meeting_type=meeting_type,
    audio_source=audio_path,
)

print(f"Meeting directory: {meeting_dir}")
print(f"Pipeline state: stage {state.completed_stage.name}")
if state.completed_stage > PipelineStage.NOT_STARTED:
    print(f"  Resuming from checkpoint (stage {state.completed_stage.value}/6)")

---
## 3. Stage 1 — Audio Ingestion

In [None]:
import time
from src.ingest import normalize_audio

wav_path = meeting_dir / "audio.wav"

if state.is_complete(PipelineStage.INGESTED):
    print("Stage 1 already complete. Skipping.")
    metadata = {"duration_seconds": 0}
    # Recover duration from existing WAV
    from src.audio_utils import get_audio_duration
    metadata["duration_seconds"] = get_audio_duration(wav_path)
else:
    print("Stage 1: Normalizing audio...")
    t0 = time.time()
    metadata = normalize_audio(audio_path, wav_path, noise_reduce=apply_noise_reduction)
    elapsed = time.time() - t0
    state.mark_complete(PipelineStage.INGESTED)
    print(f"  Done in {elapsed:.1f}s")

meeting.duration_seconds = metadata["duration_seconds"]
duration_min = meeting.duration_seconds / 60
print(f"  Audio duration: {duration_min:.1f} minutes")

---
## 4. Stage 2 — Speaker Diarization

In [None]:
import json
from src.diarize import load_diarization_pipeline, run_diarization, extract_speaker_embeddings
from src.models import Segment

diarization_path = meeting_dir / "diarization.json"
embeddings_path = meeting_dir / "embeddings.json"

if state.is_complete(PipelineStage.DIARIZED):
    print("Stage 2 already complete. Loading from checkpoint...")
    with open(diarization_path, "r") as f:
        segments = [Segment.from_dict(d) for d in json.load(f)]
    print(f"  Loaded {len(segments)} segments")
else:
    print("Stage 2: Running speaker diarization (progress bar below)...")
    # Get HF token from environment (set by notebook_login)
    from huggingface_hub import get_token
    hf_token = get_token()

    t0 = time.time()
    pipeline = load_diarization_pipeline(hf_token)
    segments = run_diarization(pipeline, wav_path, num_speakers=num_speakers)
    elapsed = time.time() - t0

    # Save checkpoint
    with open(diarization_path, "w") as f:
        json.dump([s.to_dict() for s in segments], f, indent=2)

    # Extract speaker embeddings
    print("  Extracting speaker embeddings...")
    speaker_embeddings = extract_speaker_embeddings(wav_path, segments, hf_token)

    # Save embeddings as lists for JSON serialization
    emb_data = {k: v.tolist() for k, v in speaker_embeddings.items()}
    with open(embeddings_path, "w") as f:
        json.dump(emb_data, f)

    # Free GPU memory
    del pipeline
    import gc
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    state.mark_complete(PipelineStage.DIARIZED)
    print(f"  Done in {elapsed:.1f}s")

unique_speakers = set(s.speaker_label for s in segments)
print(f"  {len(segments)} segments, {len(unique_speakers)} speakers detected")
meeting.processing_metadata.diarization_model = "pyannote/speaker-diarization-3.1"

---
## 5. Stage 3 — Transcription

In [None]:
from src.transcribe import load_whisper_model, transcribe_segments, save_raw_transcript, load_raw_transcript
from src import config

transcript_path = meeting_dir / "transcript_raw.json"

if state.is_complete(PipelineStage.TRANSCRIBED):
    print("Stage 3 already complete. Loading from checkpoint...")
    segments = load_raw_transcript(transcript_path)
    print(f"  Loaded {len(segments)} transcribed segments")
else:
    print("Stage 3: Transcribing segments...")
    resume_from = state.transcription_progress
    if resume_from > 0:
        print(f"  Resuming from segment {resume_from}/{len(segments)}")
        # Load partially transcribed segments
        if transcript_path.exists():
            segments = load_raw_transcript(transcript_path)

    t0 = time.time()
    whisper_model = load_whisper_model()

    model_name = config.WHISPER_MODEL_GPU if torch.cuda.is_available() else config.WHISPER_MODEL_CPU
    meeting.processing_metadata.transcription_model = model_name
    meeting.processing_metadata.gpu_used = torch.cuda.is_available()
    print(f"  Using model: {model_name}")

    def checkpoint_fn(current, total):
        save_raw_transcript(segments, transcript_path)
        state.update_transcription_progress(current, total)
        pct = (current / total) * 100
        print(f"  Checkpoint: {current}/{total} segments ({pct:.0f}%)")

    segments = transcribe_segments(
        whisper_model, wav_path, segments,
        checkpoint_callback=checkpoint_fn,
        resume_from=resume_from,
    )
    elapsed = time.time() - t0

    # Save final transcript
    save_raw_transcript(segments, transcript_path)

    # Free GPU memory
    del whisper_model
    import gc
    gc.collect()
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    state.mark_complete(PipelineStage.TRANSCRIBED)
    print(f"  Done in {elapsed:.1f}s")

# Show sample
print("\n  Sample transcript:")
for seg in segments[:5]:
    if seg.text:
        print(f"    [{seg.speaker_label}] {seg.text[:80]}..." if len(seg.text) > 80 else f"    [{seg.speaker_label}] {seg.text}")

---
## 6. Stage 4 — Speaker Identification

In [None]:
import numpy as np
from src.identify import identify_speakers, apply_mappings_to_segments, flag_for_review
from src.enroll import load_profiles, get_stored_centroids
from src.models import SpeakerMapping

named_transcript_path = meeting_dir / "transcript_named.json"

if state.is_complete(PipelineStage.IDENTIFIED):
    print("Stage 4 already complete. Loading from checkpoint...")
    with open(named_transcript_path, "r") as f:
        meeting_data = json.load(f)
    meeting = Meeting.from_dict(meeting_data)
    segments = meeting.segments
else:
    print("Stage 4: Identifying speakers...")

    # Load speaker embeddings
    if embeddings_path.exists():
        with open(embeddings_path, "r") as f:
            emb_data = json.load(f)
        speaker_embeddings = {k: np.array(v) for k, v in emb_data.items()}
    else:
        speaker_embeddings = {}

    # Load existing voice profiles for Layer 1
    profile_db = load_profiles()
    stored_centroids = get_stored_centroids(profile_db)
    if stored_centroids:
        print(f"  Loaded {len(stored_centroids)} voice profiles")

    # Layer 3: LLM (optional)
    llm_fn = None
    if use_llm_identification:
        print("  Loading LLM for speaker identification...")
        from src.llm_utils import load_llm, llm_identify_speakers, unload_llm
        llm = load_llm()
        llm_fn = lambda segs, maps: llm_identify_speakers(llm, segs, maps)

    # Run identification
    t0 = time.time()
    mappings = identify_speakers(
        segments, speaker_embeddings,
        stored_profiles=stored_centroids if stored_centroids else None,
        llm_identify_fn=llm_fn,
    )
    elapsed = time.time() - t0

    # Free LLM memory
    if use_llm_identification:
        unload_llm(llm)
        del llm

    # Apply to segments
    segments = apply_mappings_to_segments(segments, mappings)
    meeting.segments = segments
    meeting.speakers = mappings

    print(f"  Done in {elapsed:.1f}s")

    # Show results
    for label, m in mappings.items():
        status = "REVIEW" if m.needs_review else "OK"
        name = m.speaker_name or "(unidentified)"
        print(f"    {label} -> {name} (conf={m.confidence:.2f}, method={m.id_method}, {status})")

    review_needed = flag_for_review(mappings)
    if review_needed:
        print(f"\n  {len(review_needed)} speaker(s) need human review (see next cell)")

In [None]:
# @title Human Review (Optional) { display-mode: "form" }
# @markdown Run this cell to manually correct speaker identifications.
# @markdown Leave blank to skip. Format: SPEAKER_00=Mayor Johnson

corrections_text = ""  # @param {type:"string"}

if corrections_text.strip():
    for pair in corrections_text.split(","):
        pair = pair.strip()
        if "=" in pair:
            label, name = pair.split("=", 1)
            label = label.strip()
            name = name.strip()
            if label in meeting.speakers:
                meeting.speakers[label].speaker_name = name
                meeting.speakers[label].confidence = 1.0
                meeting.speakers[label].id_method = "human_review"
                meeting.speakers[label].needs_review = False
                print(f"  Updated: {label} -> {name}")

    # Re-apply mappings
    segments = apply_mappings_to_segments(segments, meeting.speakers)
    meeting.segments = segments
    print("  Corrections applied.")
else:
    print("  No corrections. Continuing.")

# Save named transcript checkpoint
with open(named_transcript_path, "w") as f:
    json.dump(meeting.to_dict(), f, indent=2)
state.mark_complete(PipelineStage.IDENTIFIED)
print("  Stage 4 checkpoint saved.")

---
## 7. Stage 5 — Voice Enrollment

In [None]:
from src.enroll import load_profiles, save_profiles, enroll_speakers

if state.is_complete(PipelineStage.ENROLLED):
    print("Stage 5 already complete. Skipping.")
else:
    print("Stage 5: Enrolling voice profiles...")

    # Load embeddings
    if embeddings_path.exists():
        with open(embeddings_path, "r") as f:
            emb_data = json.load(f)
        speaker_embeddings = {k: np.array(v) for k, v in emb_data.items()}
    else:
        speaker_embeddings = {}

    profile_db = load_profiles()
    before_count = len(profile_db.profiles)

    profile_db = enroll_speakers(
        profile_db, speaker_embeddings, meeting.speakers,
        meeting_id=meeting_id, segments=segments,
    )

    save_profiles(profile_db)
    after_count = len(profile_db.profiles)
    new_profiles = after_count - before_count

    state.mark_complete(PipelineStage.ENROLLED)
    print(f"  Enrolled {new_profiles} new profile(s). Total: {after_count}")
    for pid, p in profile_db.profiles.items():
        print(f"    {pid}: {p.display_name} ({len(p.meetings_seen)} meetings, {p.total_segments_confirmed} segments)")

---
## 8. Stage 6 — Export

In [None]:
from src.export import export_all

if state.is_complete(PipelineStage.EXPORTED):
    print("Stage 6 already complete.")
else:
    print("Stage 6: Exporting transcript...")

    export_dir = meeting_dir / "exports"
    results = export_all(meeting, export_dir)

    state.mark_complete(PipelineStage.EXPORTED)
    print("  Export complete:")
    for fmt, path in results.items():
        print(f"    {fmt}: {path}")

print("\nPipeline complete!")

---
## 9. Preview Transcript

In [None]:
# Preview the Markdown transcript
from IPython.display import Markdown, display

md_path = meeting_dir / "exports" / "transcript.md"
if md_path.exists():
    content = md_path.read_text()
    # Show first 3000 chars
    preview = content[:3000]
    if len(content) > 3000:
        preview += f"\n\n*... ({len(content) - 3000} more characters)*"
    display(Markdown(preview))
else:
    print("No transcript found. Run the pipeline first.")

---
## 10. Utilities

In [None]:
# @title Profile Manager { display-mode: "form" }
# @markdown View or manage stored voice profiles.

action = "list"  # @param ["list", "delete"]
profile_to_delete = ""  # @param {type:"string"}

from src.enroll import load_profiles, save_profiles

db = load_profiles()

if action == "list":
    if not db.profiles:
        print("No profiles stored yet.")
    else:
        print(f"Stored profiles ({len(db.profiles)}):")
        for pid, p in db.profiles.items():
            print(f"  {pid}: {p.display_name}")
            print(f"    Meetings: {', '.join(p.meetings_seen)}")
            print(f"    Confirmed segments: {p.total_segments_confirmed}")
            print(f"    Embeddings: {len(p.embeddings)}")

elif action == "delete" and profile_to_delete:
    if profile_to_delete in db.profiles:
        del db.profiles[profile_to_delete]
        save_profiles(db)
        print(f"Deleted profile: {profile_to_delete}")
    else:
        print(f"Profile not found: {profile_to_delete}")
        print(f"Available: {', '.join(db.profiles.keys())}")

In [None]:
# @title Batch Processing { display-mode: "form" }
# @markdown Process multiple meetings from a folder.
# @markdown Audio files should be in: /content/drive/MyDrive/CouncilScribe/meetings/batch/

batch_city = "Springfield"  # @param {type:"string"}
batch_folder = "/content/drive/MyDrive/CouncilScribe/meetings/batch"  # @param {type:"string"}

from pathlib import Path

batch_path = Path(batch_folder)
if batch_path.exists():
    audio_extensions = {".mp4", ".mkv", ".wav", ".mp3", ".m4a", ".ogg", ".flac"}
    files = sorted([f for f in batch_path.iterdir() if f.suffix.lower() in audio_extensions])
    print(f"Found {len(files)} audio files:")
    for f in files:
        print(f"  {f.name}")
    print("\nTo process these, update the Configuration cell for each file and run the pipeline.")
else:
    print(f"Batch folder not found: {batch_folder}")
    print("Create it and add audio files to use batch processing.")