In [6]:
# ==============================
# Setup
# ==============================
!pip -q install --upgrade yt-dlp youtube-transcript-api python-slugify openai requests

import os, re, json, math, requests, yt_dlp
from slugify import slugify
from typing import List, Dict, Tuple
from datetime import datetime
from collections import Counter


# OpenAI API-Key aus Colab Secrets ODER Env
OPENAI_API_KEY = None
if userdata:
    try:
        OPENAI_API_KEY = userdata.get('OPENAI_API_KEY')
    except Exception:
        pass
if not OPENAI_API_KEY:
    OPENAI_API_KEY = os.getenv("OPENAI_API_KEY")

if OPENAI_API_KEY:
    OPENAI_API_KEY = OPENAI_API_KEY.strip()
    print("✅ OpenAI API-Key verfügbar")
else:
    raise RuntimeError("❌ OPENAI_API_KEY fehlt. In Colab unter 🔑 Secrets setzen oder als Env-Var exportieren.")

# Modell (JSON-Mode-tauglich)
MODEL = os.getenv("OPENAI_MODEL", "gpt-5")

# Für 2–5 Min Videos: kein Chunking
USE_CHUNKING = False

from youtube_transcript_api import YouTubeTranscriptApi, NoTranscriptFound, TranscriptsDisabled

PREF_LANGS = ['de', 'de-DE', 'de-AT', 'de-CH', 'en', 'en-US', 'en-GB']


# ==============================
# Utilities: Parsing & Cleaning
# ==============================
def _ytt_list(video_id: str):
    """Kompatibel zu alten/neuen youtube-transcript-api-Versionen."""
    try:
        ytt = YouTubeTranscriptApi()
        return ytt.list(video_id)
    except Exception:
        return YouTubeTranscriptApi.list_transcripts(video_id)

_UI_META_PAT = re.compile(
    r"^\s*(?:\[[^\]]{0,40}\]|<[^>]+>|\([^)]+\)|\*{1,3}[^*]{0,40}\*{1,3})\s*$|"
    r"\s*\[[^\]]{2,40}\]\s*|\s*\([^)]+\)\s*|\s*♪+\s*|\s*♫+\s*",
    re.IGNORECASE
)
HTML_TAG = re.compile(r"<[^>]+>")
MULTI_SPACE = re.compile(r"\s+")

def _clean_caption_text(s: str) -> str:
    """Entfernt HTML-Tags, UI-Artefakte (z.B. [Musik], (Applaus)), trimmt Whitespaces."""
    if not s:
        return ""
    s = HTML_TAG.sub("", s)
    # häufige UI-Markups entfernen (de/en)
    s = re.sub(r"\[(?:music|musik|applause|lachen|geräusche|noise|intro|outro)\]", "", s, flags=re.I)
    s = re.sub(r"\((?:music|musik|applause|lachen|geräusche|noise|intro|outro)\)", "", s, flags=re.I)
    s = s.replace("♪", "").replace("♫", "")
    s = MULTI_SPACE.sub(" ", s).strip()
    return s

def _parse_ts(ts: str) -> float:
    ts = ts.strip().replace(",", ".")
    parts = ts.split(":")
    if len(parts) == 3:
        h, m, s = parts
    else:
        h, m, s = "0", parts[0], parts[1]
    return int(h)*3600 + int(m)*60 + float(s)

def _vtt_to_segments(vtt_text: str) -> List[Dict]:
    """
    VTT → Segmente inkl. Dauer (aus Cue-Abständen).
    Entfernt UI-Artefakte & bereinigt Duplikate in kurzer Folge.
    """
    lines = [ln.rstrip("\n") for ln in vtt_text.splitlines()]
    segments = []
    cur_start = None
    cur_lines = []
    last_kept_text = ""
    last_kept_start = -999.0

    for raw in lines:
        line = raw.strip()
        if not line or line.upper().startswith("WEBVTT") or line.isdigit():
            continue
        if "-->" in line:
            # alte Cue flushen
            if cur_start is not None:
                text = _clean_caption_text(" ".join(t for t in cur_lines if t and not _UI_META_PAT.match(t)))
                text = text.strip()
                if text:
                    # kurze Duplikate in engem Zeitfenster verwerfen
                    if not (text == last_kept_text and (cur_start - last_kept_start) < 1.5):
                        segments.append({"start": cur_start, "duration": 0.0, "text": text})
                        last_kept_text = text
                        last_kept_start = cur_start
                cur_lines = []
            # neue Cue
            cur_start = _parse_ts(line.split("-->")[0])
        else:
            if not _UI_META_PAT.match(line):
                cur_lines.append(line)

    if cur_start is not None and cur_lines:
        text = _clean_caption_text(" ".join(t for t in cur_lines if t and not _UI_META_PAT.match(t))).strip()
        if text:
            if not (text == last_kept_text and (cur_start - last_kept_start) < 1.5):
                segments.append({"start": cur_start, "duration": 0.0, "text": text})
    # Dauer aus Abständen schätzen
    for i in range(len(segments)-1):
        segments[i]["duration"] = max(0.0, segments[i+1]["start"] - segments[i]["start"])
    return segments

def _validate_segments(segs: List[Dict], min_segments=10, min_chars=500) -> Dict:
    text = " ".join(s.get("text","") for s in segs)
    ok = bool(segs and len(segs) >= min_segments and len(text) >= min_chars)
    preview = (text[:300].replace("\n"," ") + "…") if text else ""
    return {"ok": ok, "len_chars": len(text), "num_segments": len(segs), "preview": preview}

def _pick_track(tracks: Dict) -> Tuple[str, str]:
    """Bevorzuge .vtt und bevorzugte Sprachen."""
    if not tracks: return None, None
    for lang in PREF_LANGS + list(tracks.keys()):
        if lang in tracks:
            vtt = next((t for t in tracks[lang] if t.get('ext') == 'vtt' and t.get('url')), None)
            if vtt: return vtt['url'], lang
            any_t = next((t for t in tracks[lang] if t.get('url')), None)
            if any_t: return any_t['url'], lang
    return None, None

def extract_video_id(url: str) -> str:
    pats = [
        r'(?:https?:\/\/)?(?:www\.)?youtube\.com\/watch\?v=([a-zA-Z0-9_-]{11})',
        r'(?:https?:\/\/)?(?:www\.)?youtu\.be\/([a-zA-Z0-9_-]{11})',
    ]
    for p in pats:
        m = re.search(p, url)
        if m: return m.group(1)
    raise ValueError(f"Ungültige YouTube-URL: {url}")

def get_video_title(video_id: str) -> str:
    try:
        with yt_dlp.YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl:
            info = ydl.extract_info(f"https://www.youtube.com/watch?v={video_id}", download=False)
        return info.get("title", "untitled")
    except Exception:
        return "untitled"

def fetch_transcript_segments(youtube_url: str) -> Dict:
    """
    Rückgabe:
      {success, video_id, title, lang, source, segments, diagnostics}
    where segments = [{start, duration, text}, ...]
    """
    try:
        vid = extract_video_id(youtube_url)
        title = get_video_title(vid)

        # 1) youtube-transcript-api: manuell → auto
        try:
            tlist = _ytt_list(vid)
            # Zuerst manuell
            for code in PREF_LANGS:
                try:
                    t = tlist.find_manually_created_transcript([code])
                    res = t.fetch()
                    segs = res.to_raw_data() if hasattr(res, "to_raw_data") else res
                    # cleanup jedes Segments
                    for s in segs:
                        s["text"] = _clean_caption_text(s.get("text",""))
                    diag = _validate_segments(segs)
                    if diag["ok"]:
                        diag["preview"] = (" ".join(s["text"] for s in segs)[:300] + "…")
                        return {"success": True, "video_id": vid, "title": title, "lang": t.language_code,
                                "source": "transcript_api(manual)", "segments": segs, "diagnostics": diag}
                except Exception:
                    pass
            # Dann auto
            for code in PREF_LANGS:
                try:
                    t = tlist.find_generated_transcript([code])
                    res = t.fetch()
                    segs = res.to_raw_data() if hasattr(res, "to_raw_data") else res
                    for s in segs:
                        s["text"] = _clean_caption_text(s.get("text",""))
                    diag = _validate_segments(segs)
                    if diag["ok"]:
                        diag["preview"] = (" ".join(s["text"] for s in segs)[:300] + "…")
                        return {"success": True, "video_id": vid, "title": title, "lang": t.language_code,
                                "source": "transcript_api(auto)", "segments": segs, "diagnostics": diag}
                except Exception:
                    pass
        except (NoTranscriptFound, TranscriptsDisabled):
            pass
        except Exception as e:
            print("transcript-api Fehler:", e)

        # 2) Fallback: yt_dlp → VTT
        with yt_dlp.YoutubeDL({'quiet': True, 'no_warnings': True}) as ydl:
            info = ydl.extract_info(f"https://www.youtube.com/watch?v={vid}", download=False)
        url, lang = _pick_track(info.get('subtitles') or {})
        source = "ytdlp(subtitles)" if url else None
        if not url:
            url, lang = _pick_track(info.get('automatic_captions') or {})
            source = "ytdlp(automatic_captions)" if url else None
        if not url:
            return {"success": False, "error": "Keine Untertitel/Auto-Captions verfügbar"}

        r = requests.get(url, timeout=20); r.raise_for_status()
        segs = _vtt_to_segments(r.text)
        diag = _validate_segments(segs)
        return {"success": diag["ok"], "video_id": vid, "title": title, "lang": lang or "unknown",
                "source": source, "segments": segs, "diagnostics": diag}

    except Exception as e:
        return {"success": False, "error": str(e)}


# ==============================
# Text utils
# ==============================
def segments_to_text(segments: List[Dict]) -> str:
    return " ".join(s.get("text","").strip() for s in segments if s.get("text")).strip()

def mmss(seconds: float) -> str:
    secs = int(seconds + 0.5)  # runden, nicht abschneiden
    m = secs // 60
    s = secs % 60
    return f"{m:02d}:{s:02d}"

def show_diagnostics(data: Dict):
    if not data.get("success"):
        print("❌ Fehler:", data.get("error")); return
    d = data["diagnostics"]
    print(f"→ Quelle: {data['source']} | Sprache: {data['lang']} | Segmente: {d['num_segments']} | Zeichen: {d['len_chars']}")
    print("→ Vorschau:", d.get("preview","")[:200], "…")
    print("→ Erstes Segment:", data["segments"][0]["text"][:120], "…")

def save_debug_files(title: str, segments: List[Dict]):
    os.makedirs("output", exist_ok=True)
    slug = slugify(title) or "untitled"
    seg_path = f"output/{slug}_segments.json"
    txt_path = f"output/{slug}_transcript.txt"
    with open(seg_path, "w", encoding="utf-8") as f:
        json.dump({"segments": segments}, f, ensure_ascii=False, indent=2)
    text = segments_to_text(segments)
    with open(txt_path, "w", encoding="utf-8") as f:
        f.write(text)
    print(f"🗂  Debug gespeichert: {seg_path} | {txt_path}")
    print("🔎 Textvorschau:", (text[:300] + "…") if len(text) > 300 else text)


# ==============================
# OpenAI: Single-shot JSON Call
# ==============================
from openai import OpenAI
client = OpenAI(api_key=OPENAI_API_KEY)

def llm_json(prompt: str, model: str = MODEL) -> dict:
    resp = client.chat.completions.create(
        model=model,
        response_format={"type": "json_object"},
        messages=[
            {"role":"system","content":(
                "Du bist ein präziser Notiz-Assistent. "
                "Antworte ausschließlich als valides JSON – ohne Markdown, Erklärtexte oder Kommentare."
            )},
            {"role":"user","content": prompt},
        ],
    )
    return json.loads(resp.choices[0].message.content)

def summarize_single_shot(title: str, text: str) -> dict:
    """
    Ein LLM-Aufruf, der alles liefert – Zitate NUR als Text (ohne Zeiten).
    """
    prompt = f"""
Analysiere das folgende Transkript und gib NUR dieses JSON zurück:
{{
  "tldr": ["Punkt 1","Punkt 2","Punkt 3","Punkt 4","Punkt 5"],
  "kernaussagen": ["Aussage 1","Aussage 2","Aussage 3","Aussage 4","Aussage 5","Aussage 6","Aussage 7","Aussage 8"],
  "outline": ["1. Abschnitt","2. Abschnitt","3. Abschnitt","4. Abschnitt","5. Abschnitt","6. Abschnitt"],
  "zitate": ["Wörtliches Zitat 1","Wörtliches Zitat 2","Wörtliches Zitat 3","Wörtliches Zitat 4"],
  "glossar": {{"Glossar1", ....}},
  "offene_fragen": ["Frage 1","Frage 2","Frage 3,..."]
}}
Regeln:
- Schreibe deutsch.
- Nutze ausschließlich Inhalte aus dem Transkript (keine Halluzinationen).
- TL;DR: 3–5 kurze, konkrete Bullet Points.
- Kernaussagen: präzise, faktenbasiert (6–10 Punkte).
- Outline: logisch in 5–8 Punkten (Stichworte OK).
- "zitate": maximal 4 **wörtliche und vollständige** Sätze aus dem Transkript; KEINE Zeitstempel (die werden später gemappt).
- Wenn du unsicher bist, lasse das Feld leer [] statt zu raten.

Titel: {title}

Transkript:
{text}
"""
    return llm_json(prompt)


# ==============================
# Sentenzbildung + Quote → Zeit
# ==============================
STOP = {
    "der","die","das","und","oder","aber","dass","den","dem","ein","eine","einen","im","in","an","am",
    "ist","sind","war","waren","mit","zu","auf","für","von","es","ich","du","wir","ihr","sie","man",
    "auch","noch","schon","nur","so","wie","wenn","dann","weil","hat","haben","wird","werden","mal"
}
TOKEN = re.compile(r"[a-zA-ZäöüÄÖÜß0-9\-]+")

def _tokenize(s): return [w.lower() for w in TOKEN.findall(s or "")]

def join_segments_into_sentences(segments, max_chars=220):
    """
    Führt Folge-Segmente zu Sätzen zusammen.
    Startzeit = Start des ersten Teilsegments.
    """
    sentences = []
    cur_text = []
    cur_start = None
    seen_norm = set()

    def norm(s):
        return MULTI_SPACE.sub(" ", re.sub(r"[^\wäöüÄÖÜß-]+"," ", (s or "").lower())).strip()

    for seg in segments:
        t = (seg.get("text") or "").strip()
        if not t:
            continue
        if cur_start is None:
            cur_start = float(seg.get("start", 0.0))
        # Silbentrennung grob heilen
        if cur_text and cur_text[-1].endswith("-"):
            cur_text[-1] = cur_text[-1][:-1] + t
        else:
            cur_text.append(t)
        text_now = " ".join(cur_text).strip()
        if len(text_now) >= max_chars or re.search(r"[.!?…]\s*$", t):
            n = norm(text_now)
            if n and n not in seen_norm:
                sentences.append({"start": cur_start, "text": text_now})
                seen_norm.add(n)
            cur_text, cur_start = [], None

    if cur_text:
        text_now = " ".join(cur_text).strip()
        n = norm(text_now)
        if n and n not in seen_norm:
            sentences.append({"start": cur_start or 0.0, "text": text_now})
    return sentences

def _similarity(a: str, b: str) -> float:
    """Token-Overlap-Score + Substring-Bonus (robust ohne externe Libs)."""
    ta = [w for w in _tokenize(a) if w not in STOP]
    tb = [w for w in _tokenize(b) if w not in STOP]
    if not ta or not tb:
        return 0.0
    sa, sb = set(ta), set(tb)
    overlap = len(sa & sb) / max(len(sb), 1)
    # Substring-Bonus, wenn Quote (b) vollständig im Satz (a) steckt
    na = " ".join(ta); nb = " ".join(tb)
    bonus = 0.25 if nb and nb in na else 0.0
    return min(1.0, overlap + bonus)

def map_quotes_to_times(quotes: List[str], segments: List[Dict], max_quotes=4, min_gap_sec=20.0):
    """
    Mappt Zitat-Texte auf Startzeiten via Satzliste + Ähnlichkeit.
    """
    if not quotes:
        return []
    sentences = join_segments_into_sentences(segments)
    # grobe Gesamtdauer (optional für spätere Heuristiken)
    # last = segments[-1] if segments else {"start":0,"duration":0}
    picks = []
    used_idx = set()

    for q in quotes[:max_quotes]:
        q_clean = _clean_caption_text(q).strip()
        if not q_clean or len(q_clean.split()) < 6:
            continue
        # best match
        best_i, best_s, best_score = None, None, 0.0
        for i, s in enumerate(sentences):
            if i in used_idx:
                continue
            sc = _similarity(s["text"], q_clean)
            if sc > best_score:
                best_i, best_s, best_score = i, s, sc
        # threshold, damit wir keine Knaller daneben wählen
        if best_s and best_score >= 0.35:
            # Abstand zu bestehenden Picks erzwingen
            if any(abs(best_s["start"] - p["_start"]) < min_gap_sec for p in picks):
                continue
            picks.append({"text": best_s["text"], "time": mmss(best_s["start"]), "_start": best_s["start"]})
            used_idx.add(best_i)

    for p in picks:
        p.pop("_start", None)
    return picks


# ==============================
# Markdown
# ==============================
def render_markdown(data: Dict, url: str, title: str) -> str:
    return f"""# {title}

- Quelle: {url}

## TL;DR (3–5 Bullet Points)
{chr(10).join(f"- {p}" for p in data.get('tldr', []))}

## Kernaussagen
{chr(10).join(f"- {p}" for p in data.get('kernaussagen', []))}

## Struktur / Outline
{chr(10).join(f"{i+1}. {p}" for i,p in enumerate(data.get('outline', [])))}

## Zitate mit Zeitstempel
{chr(10).join(f"- **[{q.get('time','00:00')}]** {q.get('text','')}" for q in data.get('zitate', []))}

## Glossar
{chr(10).join(f"- **{k}**: {v}" for k,v in (data.get('glossar', {}) or {}).items())}

## Offene Fragen
{chr(10).join(f"- {p}" for p in data.get('offene_fragen', []))}
""".strip()

def save_markdown(md_text: str, title: str) -> str:
    os.makedirs("output", exist_ok=True)
    slug = slugify(title) or "untitled"
    path = f"output/{slug}.md"
    with open(path, "w", encoding="utf-8") as f:
        f.write(md_text + "\n")
    return path


# ==============================
# Pipeline
# ==============================
def run_pipeline(youtube_url: str):
    print("🚀 Starte Pipeline\n")
    data = fetch_transcript_segments(youtube_url)
    if not data.get("success"):
        print("❌ Abbruch:", data.get("error"))
        return None

    print("✅ Transkript geladen:")
    show_diagnostics(data)

    # Debug-Dateien ablegen (JSON + Plaintext)
    save_debug_files(data["title"], data["segments"])

    # Reiner Text
    text = segments_to_text(data["segments"])
    print(f"\n🧪 Reiner Text: {len(text)} Zeichen")

    # Single-Shot Summarization
    print("\n🤖 Verdichtung/Extraktion (LLM – Single Shot, JSON only)…")
    summary = summarize_single_shot(data["title"], text)

    # Zitate (mit Zeit-Mapping)
    # LLM gibt zitate als Liste von Strings zurück. Mappen auf Zeiten.
    raw_quotes = summary.get("zitate") or []
    if isinstance(raw_quotes, dict):  # falls ein Modell doch Objekte liefert
        raw_quotes = list(raw_quotes.values())
    raw_quotes = [q if isinstance(q, str) else str(q) for q in raw_quotes]

    mapped_quotes = map_quotes_to_times(raw_quotes, data["segments"], max_quotes=4)
    summary["zitate"] = mapped_quotes

    # Markdown + Save
    md_text = render_markdown(summary, url=youtube_url, title=data["title"])
    out_path = save_markdown(md_text, data["title"])
    print(f"\n✅ Fertig. Datei gespeichert: {out_path}")
    return out_path


# ==============================
# Test
# ==============================
TEST_URL = "https://www.youtube.com/watch?v=yN2iYWbwFFs"
run_pipeline(TEST_URL)


✅ OpenAI API-Key verfügbar
🚀 Starte Pipeline

✅ Transkript geladen:
→ Quelle: transcript_api(auto) | Sprache: de | Segmente: 302 | Zeichen: 11434
→ Vorschau: KI-Agenten sind die Zukunft von künstlicher Intelligenz. Aber was bedeutet das für dich konkret? Ich habe mich die letzten Jahre intensiv mit LMS auseinandergesetzt und bereits auch schon dazu unterri …
→ Erstes Segment: KI-Agenten sind die Zukunft von …
🗂  Debug gespeichert: output/ki-agenten-fur-beginner-alle-grundlagen-in-9-min-einfach-erklart_segments.json | output/ki-agenten-fur-beginner-alle-grundlagen-in-9-min-einfach-erklart_transcript.txt
🔎 Textvorschau: KI-Agenten sind die Zukunft von künstlicher Intelligenz. Aber was bedeutet das für dich konkret? Ich habe mich die letzten Jahre intensiv mit LMS auseinandergesetzt und bereits auch schon dazu unterrichtet. Wenn du jetzt im Job schon regelmäßig KI Tools nutzt und wissen willst, wie KI Agenten dich n…

🧪 Reiner Text: 11434 Zeichen

🤖 Verdichtung/Extraktion (LLM – Single Sh

'output/ki-agenten-fur-beginner-alle-grundlagen-in-9-min-einfach-erklart.md'