Pipeline A: Whisper -> LLM-Diarisierung
Kein Chunking, kein JSON
Output: [Sprecher 1]: ... 

In [None]:
### Imports + Konfiguration
import os
from pathlib import Path
import requests
from dotenv import load_dotenv
import time
import json
from datetime import datetime

load_dotenv()

# .env-Variablen (nicht öffentlich) laden
VLLM_BASE_URL = os.getenv("VLLM_BASE_URL")
VLLM_MODEL = os.getenv("VLLM_MODEL")
REPO_ROOT = Path((os.getenv("REPO_ROOT"))).expanduser().resolve()
TEST_AUDIO = REPO_ROOT / os.getenv("TEST_AUDIO") # In .env definierte Audio-Datei für einzelne Tests

# Pfad zu gespeicherten Audios
AUDIO_DIR = REPO_ROOT / "data" / "input_audio"
CONVERTED_DIR = REPO_ROOT / "data" / "normalised_audio"

In [None]:
### Audio-Normalisierung via ffmpeg, alles konvertieren zu WAV 16khz Mono -> Wichtig für Pyannote
import subprocess
from pathlib import Path

def ensure_wav_16k_mono(input_path: str | Path, out_dir: str | Path) -> Path:
    input_path = Path(input_path)
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    out_path = out_dir / f"{input_path.stem}_16k_mono.wav"

    # Reuse, wenn schon vorhanden
    if out_path.exists() and out_path.stat().st_size > 0:
        return out_path

    cmd = [
        "ffmpeg", "-y",
        "-i", str(input_path),
        "-ac", "1",        # mono
        "-ar", "16000",    # 16kHz
        "-vn",             # no video
        str(out_path),
    ]
    res = subprocess.run(cmd, capture_output=True, text=True)
    if res.returncode != 0:
        raise RuntimeError(f"ffmpeg failed:\n{res.stderr}")
    return out_path

In [None]:
### Konvertierungs-Test
# Testaudio mit normalisierter Version überschreiben
conversion_test = False
if conversion_test:
    TEST_AUDIO = ensure_wav_16k_mono(TEST_AUDIO, CONVERTED_DIR)

    print("WAV:", TEST_AUDIO)
    print("Exists:", TEST_AUDIO.exists(), "Size:", TEST_AUDIO.stat().st_size)

In [None]:
### vLLM-Client

def chat_vllm(messages, model=VLLM_MODEL, temperature=0.0, max_tokens=200, timeout=600):
    url = f"{VLLM_BASE_URL}/chat/completions"
    payload = {
        "model": model,
        "messages": messages,
        "temperature": temperature,
        "max_tokens": max_tokens,
    }
    r = requests.post(url, json=payload, timeout=timeout)
    r.raise_for_status()
    data = r.json()

    content = data["choices"][0]["message"].get("content", None)
    if content is None:
        raise RuntimeError(f"LLM returned no content. Full response: {json.dumps(data)[:2000]}")
    return content


In [None]:
# Kurzer LLM-Test: Bei Bedarf auf True setzen
llm_test = True

if llm_test:
    print(chat_vllm([{"role":"user","content":"Antworte nur mit OK"}]))

Whisper-Transkription der Audio via faster-whisper
- lokal reproduzierbar, keine Cloud
- wir nehmen Segment-Zeitstempel für späteren Pyannote-Merge (nicht benötigt für LLM-Diarisierung)

In [None]:
### Whisper-Config
from faster_whisper import WhisperModel

WHISPER_MODEL_SIZE = os.getenv("WHISPER_MODEL_SIZE", "small") # Kleines Modell für Tests
# WHISPER_MODEL_SIZE = os.getenv("WHISPER_MODEL_SIZE", "large-v3") # Großes Modell für Prod
WHISPER_DEVICE = os.getenv("WHISPER_DEVICE", "cpu")  # Für unseren Test auf CPU laufen lassen -> langsamer
# WHISPER_DEVICE = os.getenv("WHISPER_DEVICE", "cuda") # Wenn verfügbar: Auf GPU laufen lassen -> schneller -> Aber: Konfig-Anpassungen notwendig!
WHISPER_COMPUTE_TYPE = os.getenv("WHISPER_COMPUTE_TYPE", "int8")  # cpu: int8 gut

whisper_model = WhisperModel(WHISPER_MODEL_SIZE, device=WHISPER_DEVICE, compute_type=WHISPER_COMPUTE_TYPE)

In [None]:
### Transkription
def transcribe_faster_whisper(audio_path: str, language: str | None = None):
    segments_iter, info = whisper_model.transcribe(
        audio_path,
        language=language,
        vad_filter=True,
        word_timestamps=False,  # später ggf. True, falls wir word-level brauchen
    )
    segments = []
    texts = []
    for seg in segments_iter:
        txt = seg.text.strip()
        segments.append({"start": float(seg.start), "end": float(seg.end), "text": txt})
        texts.append(txt)
    transcript = "\n".join(texts).strip()
    return transcript, segments, info

In [None]:
### Whisper-Test anhand der Test-Audio. Aktivieren -> True setzen
whisper_test = False
if whisper_test:
    t, segs, info = transcribe_faster_whisper(TEST_AUDIO, language="de")
    print("Language:", info.language, "Prob:", info.language_probability)
    print("Transcript:\n", t[:1000])
    print("\nFirst segments:", segs[:3])

### LLM-Diarisierung Prompt
System- und User-Prompts

In [None]:
def diarize_with_llm(transcript: str):

    system_prompt = (
        "You are a professional conversation diarization engine. "
        "Assign speaker labels logically solely based on the text. "
        "No reasoning. " # Test-Weise, weil sonst zu lange Zeiten.
        "Output only the diarized transcript."
    )

    user_prompt = (
        "Add speaker labels to the following transcript. "
        "Use '[Sprecher 1]', '[Sprecher 2]', ... for the different speakers. "
        "Check thoroughly for every sentence if the assignment to the speaker is contextually and logically plausible. "
        "Produce a readable dialogue format between Speakers.\n\n"
        f"{transcript}"
    )


    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "user", "content": user_prompt},
    ]
    return chat_vllm(messages, temperature=0.0, max_tokens=2000)

In [None]:
# Kombinierter Test: Whisper-Diarisierung -> LLM-Diarisierung anhand der Test-Audio, True setzen für Test
whisper_llm_test = False
if whisper_llm_test:
    print("Starte Transkription mit faster-whisper")
    t0 = time.time()
    transcript, segs, info = transcribe_faster_whisper(TEST_AUDIO, language="de")
    t1 = time.time()
    print(f"Whisper-Transkription abgeschlossen in ({t1 - t0:.2f}s)")
    print("LLM-Diarisierung gestartet")
    t2 = time.time()
    diarized = diarize_with_llm(transcript)
    t3 = time.time()
    print(f"LLM-Diarisierung abgeschlossen in ({t3 - t2:.2f}s)")
    print(diarized)

In [None]:
### Pyannote-Setup (CPU-only)
import torch
from pyannote.audio import Pipeline

HF_TOKEN = os.getenv("HUGGINGFACE_TOKEN")
assert HF_TOKEN, "HUGGINGFACE_TOKEN fehlt in .env" # Fehlerbehandlung

PYANNOTE_MODEL_ID = os.getenv("PYANNOTE_MODEL_ID")

pyannote_pipeline = Pipeline.from_pretrained(PYANNOTE_MODEL_ID, token=HF_TOKEN)

# CPU erzwingen
pyannote_pipeline.to(torch.device("cpu"))


In [None]:
### Pyannote starten
def run_pyannote(wav_path: Path):
    
    diarization = pyannote_pipeline(str(wav_path))

    return diarization

In [None]:
### Minimales Pyannote Post-Processing (Mapping auf die Whisper-Segmente)
import re

### Pipeline: ensure_wav_16k_mono -> transcribe_faster_whisper -> run_pyannote -> label_whisper_segments -> normalize_speakers 
### (für Lesbarkeit, optional) -> merge_same_speakers (für Lesbarkeit, optional) -> format_dialogue (optional?)

def pyannote_turns(diarization):
    # diarization kann Annotation sein oder ein Objekt mit .speaker_diarization (Letzteres in unserem Fall)
    ann = getattr(diarization, "speaker_diarization", diarization)
    turns = []
    for turn, _, speaker in ann.itertracks(yield_label=True):
        turns.append((float(turn.start), float(turn.end), str(speaker)))
    turns.sort(key=lambda x: (x[0], x[1]))
    return turns

def overlap(a0, a1, b0, b1):
    return max(0.0, min(a1, b1) - max(a0, b0))

# Haupt-Funktion zum Mappen, ohne Start- und End-Zeiten
def label_whisper_segments(segments, diarization, min_overlap_s=0.05):

    turns = pyannote_turns(diarization)
    labeled = []

    for seg in segments:
        s0, s1 = float(seg["start"]), float(seg["end"])
        best_spk, best_ol = "UNK", 0.0

        for t0, t1, spk in turns:
            if t1 <= s0:
                continue
            if t0 >= s1:
                break
            ol = overlap(s0, s1, t0, t1)
            if ol > best_ol:
                best_ol, best_spk = ol, spk

        if best_ol < min_overlap_s:
            best_spk = "UNK"

        labeled.append({
            "start": s0, "end": s1,              # intern behalten
            "speaker_raw": best_spk,
            "text": (seg.get("text") or "").strip()
        })

    return labeled

### Kosmetische Funktion zur Vergleichbarkeit mit LLM-Ausgabe SPEAKER_00 -> Sprecher 1
def normalize_speakers(labeled):
    # rein kosmetisch: SPEAKER_00 -> Sprecher 1, ...
    mapping = {}
    n = 1
    for x in labeled:
        raw = x["speaker_raw"]
        if raw != "UNK" and raw not in mapping:
            mapping[raw] = f"[Sprecher {n}]"
            n += 1
    for x in labeled:
        x["speaker"] = mapping.get(x["speaker_raw"], "UNK")
    return labeled

def merge_same_speaker(labeled, max_gap_s=0.8):
    # nur Lesbarkeit: aufeinanderfolgende Segmente gleichen Speakers zusammenziehen
    # (Gar kein Heuristik-Merging: max_gap_s=0 setzen)
    out = []
    for seg in labeled:
        if not seg["text"]:
            continue
        if not out:
            out.append(dict(seg))
            continue

        prev = out[-1]
        gap = seg["start"] - prev["end"]
        if seg["speaker"] == prev["speaker"] and gap <= max_gap_s:
            prev["text"] = (prev["text"].rstrip() + " " + seg["text"].lstrip()).strip()
            prev["end"] = seg["end"]
        else:
            out.append(dict(seg))
    return out

def format_dialogue(utterances):
    lines = []
    for u in utterances:
        txt = re.sub(r"\s+", " ", u["text"]).strip()
        if txt:
            lines.append(f"{u['speaker']}: {txt}")
    return "\n".join(lines)

In [None]:
### Pyannote-Test mit vorheriger Transkribierung (für Segmente), bei Bedarf auf True setzen
pyannote_test = True
if pyannote_test:
    print("Normalisiere Datei für Pyannote (wav-Datei mit 16k mono)")
    wav_path = ensure_wav_16k_mono(TEST_AUDIO, CONVERTED_DIR)
    print("Normalisierung abgeschlossen")
    print("Starte Transkription mit faster-whisper")
    t0 = time.time()
    transcript, segs, info = transcribe_faster_whisper(wav_path, language="de")
    t1 = time.time()    
    print(f"Whisper-Transkription abgeschlossen in ({t1 - t0:.2f}s)")
    print("Pyannote gestartet...")
    t2 = time.time()
    diarization = run_pyannote(wav_path)
    t3 = time.time()
    print(f"Pyannote abgeschlossen in ({t3 - t2:.2f}s)")
    print("Text-Aufbereitung begonnen")
    t4 = time.time()
    # alle Schritte als eigene Variable speichern für Debugging
    labeled = label_whisper_segments(segs, diarization, min_overlap_s=0.05) # 0.05 verbreiteter Default-Wert
    labeled2 = normalize_speakers(labeled)
    labeled3 = merge_same_speaker(labeled2, max_gap_s=0.8)
    labeled4 = format_dialogue(labeled3)
    t5 = time.time()
    print(f"Nachbearbeitung abgeschlossen in ({t5-t4:.2f}s)")
    print(f"Gesamt-Pipeline (Pyannote + Nachbereitung) abgeschlossen in ({t5-t2:.2f}s)")

    print("End-Ausgabe:\n" + labeled4)

Batch-Loop für alle Audios
- Whisper -> LLM

In [None]:
### Input-Audios vorbereiten

BATCH_AUDIO_DIR = REPO_ROOT / "data" / "input_audio"
BATCH_EXTS = {".wav", ".mp3", ".m4a", ".flac", ".ogg"}

def list_audio_files(folder: Path) -> list[Path]:
    files = []
    for p in folder.rglob("*"):
        if p.is_file() and p.suffix.lower() in BATCH_EXTS:
            files.append(p)
    return sorted(files)

audio_files = list_audio_files(BATCH_AUDIO_DIR)
print("Found files:", len(audio_files))
for f in audio_files[:10]:
    print("-", f)

In [None]:
### Laufzeit - Messung und korrektes Speichern, Hilfsfunktion
RESULTS_DIR = REPO_ROOT / "results"

def save_run(out_dir: Path, *, meta: dict, transcript: str, diarized: str):
    out_dir.mkdir(parents=True, exist_ok=True)
    (out_dir / "meta.json").write_text(json.dumps(meta, indent=2, ensure_ascii=False), encoding="utf-8")
    (out_dir / "transcript.txt").write_text(transcript, encoding="utf-8")
    (out_dir / "llm_diarized.txt").write_text(diarized, encoding="utf-8")

In [None]:
### Haupt-Pipeline Batch-LLM-Diarisierung
def run_whisper_llm_on_file(audio_path: Path, *, clip_chars: int | None = None) -> dict:
    t0 = time.time()

    # 1) normalize
    wav_path = ensure_wav_16k_mono(audio_path, REPO_ROOT / "data" / "normalised_audio")

    # 2) transcribe
    t1 = time.time()
    transcript, segs, info = transcribe_faster_whisper(str(wav_path), language="de")
    t2 = time.time()

    # 3) LLM diarize
    diarized = diarize_with_llm(llm_input)
    t3 = time.time()

    meta = {
        "timestamp": datetime.now().isoformat(),
        "audio_path": str(audio_path),
        "wav_path": str(wav_path),
        "vllm_base_url": VLLM_BASE_URL,
        "vllm_model": VLLM_MODEL,
        "whisper_model_size": WHISPER_MODEL_SIZE,
        "whisper_device": WHISPER_DEVICE,
        "whisper_compute_type": WHISPER_COMPUTE_TYPE,
        "transcript_chars": len(transcript),
        "transcript_words": len(transcript.split()),
        "llm_input_chars": len(llm_input),
        "llm_output_chars": len(diarized),
        "seconds_total": round(t3 - t0, 2),
        "seconds_transcribe": round(t2 - t1, 2),
        "seconds_llm": round(t3 - t2, 2),
        "language": getattr(info, "language", None),
        "language_probability": float(getattr(info, "language_probability", 0.0) or 0.0),
    }

    # speichern
    out_dir = RESULTS_DIR / audio_path.stem / VLLM_MODEL
    save_run(out_dir, meta=meta, transcript=transcript, diarized=diarized)

    return meta

In [None]:
### Ausführen der Pipeline für alle Audios

metas = []
failures = []

for i, f in enumerate(audio_files, 1):
    print(f"\n[{i}/{len(audio_files)}] Processing:", f)
    try:
        meta = run_whisper_llm_on_file(f, clip_chars=BATCH_CLIP_CHARS)
        metas.append(meta)
        print("  -> OK | total:", meta["seconds_total"], "s | llm:", meta["seconds_llm"], "s | chars:", meta["transcript_chars"])
    except Exception as e:
        failures.append({"file": str(f), "error": repr(e)})
        print("  -> FAIL:", repr(e))

# summary files
summary_dir = RESULTS_DIR / "summaries" / VLLM_MODEL
summary_dir.mkdir(parents=True, exist_ok=True)
(summary_dir / "summary_meta.json").write_text(json.dumps(metas, indent=2, ensure_ascii=False), encoding="utf-8")
(summary_dir / "failures.json").write_text(json.dumps(failures, indent=2, ensure_ascii=False), encoding="utf-8")

print("\nDone.")
print("Success:", len(metas), "Failures:", len(failures))
print("Summary:", summary_dir)