In [2]:
# %% [code] Install dependencies (run once)
%pip install --upgrade "git+https://github.com/openai/whisper.git" \
                        yt-dlp pydub tqdm pyannote.audio "torch>=2.1" torchaudio


Collecting git+https://github.com/openai/whisper.git
  Cloning https://github.com/openai/whisper.git to /private/var/folders/9c/jws73d7n2ksby8dj7d0vjq240000gn/T/pip-req-build-wy7j7p68
  Running command git clone --filter=blob:none --quiet https://github.com/openai/whisper.git /private/var/folders/9c/jws73d7n2ksby8dj7d0vjq240000gn/T/pip-req-build-wy7j7p68
  Resolved https://github.com/openai/whisper.git to commit 517a43ecd132a2089d85f4ebc044728a71d49f6e
  Installing build dependencies ... [?25ldone
[?25h  Getting requirements to build wheel ... [?25ldone
[?25h  Preparing metadata (pyproject.toml) ... [?25ldone
[?25hCollecting yt-dlp
  Downloading yt_dlp-2025.3.31-py3-none-any.whl.metadata (172 kB)
Collecting pydub
  Using cached pydub-0.25.1-py2.py3-none-any.whl.metadata (1.4 kB)
Collecting tqdm
  Using cached tqdm-4.67.1-py3-none-any.whl.metadata (57 kB)
Collecting pyannote.audio
  Downloading pyannote.audio-3.3.2-py2.py3-none-any.whl.metadata (11 kB)
Collecting torch>=2.1
  Down

In [42]:
# %% [code] Imports & paths
import os, re, json, math, tempfile, uuid
from pathlib import Path
from datetime import timedelta
from concurrent.futures import ThreadPoolExecutor, as_completed

import yt_dlp
from pydub import AudioSegment
import whisper
from pyannote.audio import Pipeline
from tqdm.auto import tqdm


In [43]:
# %% [code] Config
YOUTUBE_URL = "https://www.youtube.com/watch?v=SJKr7BPOXY0"  # 👈 paste podcast link

WHISPER_MODEL_NAME = "base.en"      # or "medium", "large-v3", etc.
CHUNK_MINUTES      = 1        
DOWNLOAD_MINUTES    = 2              # 👈 only download first N minutes     # Whisper handles ~30‑min, but 10 keeps GPU memory low
NUM_THREADS        = 4              # parallel chunk transcription
HF_TOKEN           = "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"  # speaker diarization
OUTPUT_DIR         = Path("outputs")
OUTPUT_DIR.mkdir(exist_ok=True)


In [44]:
# %% [code] 1️⃣  Download YouTube audio
def download_audio(url: str) -> Path:
    """Download highest‑quality audio, return local .wav path."""
    out = OUTPUT_DIR / f"{uuid.uuid4()}.%(ext)s"
    ydl_opts = {
        "format": "bestaudio/best",
        "outtmpl": str(out),
        "postprocessors": [
            {"key": "FFmpegExtractAudio", "preferredcodec": "wav", "preferredquality": "192"}
        ],
        "quiet": True,
        "no_warnings": True,
    }
    with yt_dlp.YoutubeDL(ydl_opts) as ydl:
        ydl.download([url])
    return next(OUTPUT_DIR.glob(out.name.replace("%(ext)s", "wav")))

wav_path = download_audio(YOUTUBE_URL)
print("Downloaded →", wav_path)

from pydub import AudioSegment

if DOWNLOAD_MINUTES:
    orig = AudioSegment.from_file(wav_path)
    trimmed = orig[: DOWNLOAD_MINUTES * 60_000 ]
    trimmed_path = wav_path.with_name(wav_path.stem + f"_first{DOWNLOAD_MINUTES}m.wav")
    trimmed.export(trimmed_path, format="wav")
    wav_path = trimmed_path
    print(f"Trimmed to {DOWNLOAD_MINUTES} min →", wav_path)


Downloaded → outputs/91c71ed3-5cef-4e54-a20b-ce9e5a9cc490.wav
Trimmed to 2 min → outputs/91c71ed3-5cef-4e54-a20b-ce9e5a9cc490_first2m.wav


In [45]:
# %% [code] 2️⃣  Chunk helper
def chunk_audio(wav_file: Path, minutes: int = CHUNK_MINUTES):
    audio = AudioSegment.from_file(wav_file)
    ms = minutes * 60_000
    chunks = []
    for i in range(0, len(audio), ms):
        chunk = audio[i : i + ms]
        chunk_path = wav_file.with_suffix(f".part{i//ms}.wav")
        chunk.export(chunk_path, format="wav")
        chunks.append((i / 1000.0, chunk_path))      # (chunk_offset_seconds, path)
    return chunks

chunks = chunk_audio(wav_path) if CHUNK_MINUTES else [(0, wav_path)]
print(f"{len(chunks)=}")


len(chunks)=2


In [46]:
# %% [code] 3️⃣  Load Whisper
device =  "cpu"
whisper_model = whisper.load_model(WHISPER_MODEL_NAME, device=device)


In [47]:
# %% [code] 4️⃣  Transcribe chunks in parallel
# %% [code] 3.1️⃣  Thread‐safety for Whisper
import threading
model_lock = threading.Lock()

# %% [code] 4️⃣  Transcribe chunks in parallel (thread‐safe)
def transcribe_one(offset_sec, path):
    # we only allow one thread at a time into whisper_model.transcribe
    with model_lock:
        result = whisper_model.transcribe(
            str(path),
            word_timestamps=True,
            verbose=False,
            fp16=device=="cuda",
            initial_prompt=None
        )
    # shift timestamps by chunk offset so they are global
    for seg in result["segments"]:
        seg["start"] += offset_sec
        seg["end"]   += offset_sec
        for wd in seg["words"]:
            wd["start"] += offset_sec
            wd["end"]   += offset_sec
    return result["segments"]

all_segments = []
with ThreadPoolExecutor(max_workers=NUM_THREADS) as ex:
    futures = [ex.submit(transcribe_one, off, p) for off, p in chunks]
    for f in tqdm(as_completed(futures), total=len(futures)):
        all_segments.extend(f.result())

all_segments.sort(key=lambda s: s["start"])
print(f"Total segments: {len(all_segments)}")



100%|██████████| 6000/6000 [00:03<00:00, 1633.49frames/s]
100%|██████████| 6000/6000 [00:03<00:00, 1749.99frames/s]
100%|██████████| 2/2 [00:07<00:00,  3.70s/it]

Total segments: 31





In [48]:
# %% [code] 5️⃣  Run PyAnnote diarization
assert HF_TOKEN, "Set HF_TOKEN env variable first!"
dia_pipeline = Pipeline.from_pretrained("pyannote/speaker-diarization-3.1",
                                         use_auth_token=HF_TOKEN,
                                         cache_dir=str(Path.home()/".cache/pyannote"))
dia_result = dia_pipeline(str(wav_path))


  std = sequences.std(dim=-1, correction=1)


In [49]:
# %% [code] 5.1️⃣  Robust assign_speakers
from itertools import groupby

def assign_speakers(segments, diarization):
    """
    Merge Whisper word‑level output with PyAnnote diarization.
    Returns a list of dicts: {speaker, start, end, text}
    """
    # ---- 1. Flatten diarization ----
    time_speakers = []
    for item in diarization.itertracks(yield_label=True):
        # item can be (segment, track, label)  OR  (segment, label)
        if len(item) == 3:
            segment, _, label = item
        elif len(item) == 2:
            segment, label = item
        else:
            raise RuntimeError(f"Unexpected tuple length {len(item)} from itertracks.")
        time_speakers.append((segment.start, segment.end, label))

    time_speakers.sort(key=lambda x: x[0])

    # ---- 2. Tag each Whisper word with the current speaker ----
    idx = 0
    for seg in segments:
        for wd in seg["words"]:
            # advance diarization pointer until word falls into the turn
            while idx < len(time_speakers) - 1 and wd["start"] >= time_speakers[idx][1]:
                idx += 1
            wd["speaker"] = time_speakers[idx][2]

    # ---- 3. Collapse consecutive words with same speaker into turns ----
    all_words = sorted(
        (w for s in segments for w in s["words"]),
        key=lambda w: w["start"]
    )
    speaker_turns = []
    for speaker, words in groupby(all_words, key=lambda w: w["speaker"]):
        words = list(words)
        speaker_turns.append({
            "speaker": speaker,
            "start":   words[0]["start"],
            "end":     words[-1]["end"],
            "text":    " ".join(w["word"] for w in words)
        })

    return speaker_turns

# ---- call as before ----
speaker_turns = assign_speakers(all_segments, dia_result)
print(f"Merged into {len(speaker_turns)} speaker turns")


Merged into 3 speaker turns


In [50]:
# %% [code] 6️⃣  Serialize outputs
def to_timestamp(sec):
    return str(timedelta(seconds=round(sec, 3)))[:-3]

txt_lines = []
for t in speaker_turns:
    line = f"[{to_timestamp(t['start'])} – {to_timestamp(t['end'])}] {t['speaker']}: {t['text']}"
    txt_lines.append(line)

txt_path  = OUTPUT_DIR / f"{wav_path.stem}_diarized.txt"
json_path = OUTPUT_DIR / f"{wav_path.stem}_diarized.json"

txt_path.write_text("\n".join(txt_lines), encoding="utf-8")
json_path.write_text(json.dumps(speaker_turns, indent=2, ensure_ascii=False))

print("Saved:")
print(" •", txt_path)
print(" •", json_path)


Saved:
 • outputs/91c71ed3-5cef-4e54-a20b-ce9e5a9cc490_first2m_diarized.txt
 • outputs/91c71ed3-5cef-4e54-a20b-ce9e5a9cc490_first2m_diarized.json


In [55]:
"""
Generate high‑level insights (topic, per‑speaker opinions, summary)
from a diarized podcast transcript using GPT‑4o.
"""

import json
import os
from pathlib import Path
from textwrap import dedent

from dotenv import load_dotenv
from openai import OpenAI

# ── 1. Configuration ────────────────────────────────────────────────────────────
load_dotenv()                                # loads .env if present
OPENAI_API_KEY = "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"  # raises KeyError if missing
MODEL_NAME = "gpt-4o"                        # alias for the latest GPT‑4o
TRANSCRIPT_FILE = Path("outputs/91c71ed3-5cef-4e54-a20b-ce9e5a9cc490_first2m_diarized.json")       # your sample file name
TEMPERATURE = 0.3                            # keep outputs focused / deterministic

# ── 2. Helper: turn JSON list → readable transcript string ──────────────────────
def format_transcript(dialogue: list[dict]) -> str:
    """Convert list of {speaker,start,end,text} into a neat text block."""
    lines = [
        f"{turn['speaker']}: {turn['text'].strip()}"
        for turn in dialogue
        if turn.get("text")
    ]
    return "\n".join(lines)

# ── 3. Call GPT‑4o and parse its JSON answer ────────────────────────────────────
def generate_insights(dialogue: list[dict]) -> dict:
    client = OpenAI(api_key=OPENAI_API_KEY)

    transcript = format_transcript(dialogue)

    prompt = dedent(
        f"""
        You are an expert conversation analyst. Analyze the following podcast
        transcript and return **only** valid JSON (no commentary) with this schema:

        {{
          "topic":   string,              # one‑line topic
          "speaker_opinions": {{
              "<speaker>": [string, …]   # bullet‑like points per speaker
          }},
          "summary": string              # concise paragraph (≤120 words)
        }}

        Transcript:
        ```
        {transcript}
        ```
        """
    )

    response = client.chat.completions.create(
        model=MODEL_NAME,
        messages=[{"role": "user", "content": prompt}],
        temperature=TEMPERATURE,
        response_format={ "type": "json_object" }
    )

    # GPT‑4o returns JSON‑formatted text – parse it safely.
    raw = response.choices[0].message.content
    try:
        return json.loads(raw)
    except json.JSONDecodeError as exc:
        raise ValueError("Model did not return valid JSON") from exc


In [56]:
if not TRANSCRIPT_FILE.exists():
    raise SystemExit(f"Transcript file not found: {TRANSCRIPT_FILE}")

with TRANSCRIPT_FILE.open(encoding="utf-8") as f:
    dialogue = json.load(f)

insights = generate_insights(dialogue)

print(json.dumps(insights, indent=2, ensure_ascii=False))

ValueError: Model did not return valid JSON

In [57]:
response

NameError: name 'response' is not defined