# META transcription

Gabriel Bonnin

In [1]:
! pip install -r requirements.txt

# Für eventuellen Grafikkarten-Support:
! pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu128

In [2]:
from dotenv import load_dotenv
import os
import torch

# === Device Setup ===
if torch.backends.mps.is_available():  # Apple GPU (Metal)
    device = "mps"
    print("Apple GPU (Metal) erkannt – Whisper läuft auf der GPU.")
elif torch.cuda.is_available():  # NVIDIA GPU
    device = "cuda"
    print(f"NVIDIA GPU erkannt ({torch.cuda.get_device_name(0)}) – Whisper läuft auf der GPU.")
else:
    device = "cpu"
    print("Keine GPU erkannt – Whisper läuft auf dem CPU.")

# === Pfade ===
load_dotenv()
data_root = os.getenv("DATA_ROOT")
raw_audio_path = os.path.join(data_root, "raw/raw_audio")   # hier lagen die eingesprochenen Dateien
processed_path = os.path.join(data_root, "processed/processed_transcriptions")  # hier werden die CSV mit den transkribierten Daten, sowie ein Log-File gespeichern
os.makedirs(processed_path, exist_ok=True)

processed_log_path = os.path.join(processed_path, "processed_log.txt")
output_csv = os.path.join(processed_path, "Transcriptions.csv")

if data_root is None:
    raise EnvironmentError("DATA_ROOT ist nicht gesetzt! Bitte .env anlegen.")

# === Whisper Modell Set-Up ===
model_name = "large-v2"  # Je nach GPU Speichergröße bzw. Gerät auf dem der Code läuft. Auswahlmöglichkeiten: "tiny", "base", "small", "medium", "large", "large-v2", "large-v3"

Apple GPU (Metal) erkannt – Whisper läuft auf der GPU.

In [None]:
import os
import pandas as pd
import re
import whisper
import torch
import librosa
from rapidfuzz import process, fuzz

# === Helper: bereits verarbeitete Dateien laden ===
processed_files = set()
if os.path.exists(processed_log_path):
    with open(processed_log_path, "r", encoding="utf-8") as f:
        for line in f:
            path = line.strip()
            if path:
                path = os.path.abspath(path).replace("\\", "/")  # Normalisieren
                processed_files.add(path)

# === GPU Cache leeren (falls nötig) ===
if device == "cuda":
    torch.cuda.empty_cache()

# === Whisper-Modell laden auf passendem Device ===
model = whisper.load_model(model_name)

# === NATO-Alphabet ===
NATO_CANON = {
    "alpha": "A", "bravo": "B", "charlie": "C", "delta": "D", "echo": "E",
    "foxtrot": "F", "golf": "G", "hotel": "H", "india": "I", "juliett": "J",
    "kilo": "K", "lima": "L", "mike": "M", "november": "N", "oscar": "O",
    "papa": "P", "quebec": "Q", "romeo": "R", "sierra": "S", "tango": "T",
    "uniform": "U", "victor": "V", "whiskey": "W", "xray": "X", "yankee": "Y", "zulu": "Z"
}

# Häufige Varianten/ASR-Fehler -> canonical key
NATO_ALIASES = {
    "whisky": "whiskey",
    "juliet": "juliett",
    "x-ray": "xray",
    "x ray": "xray",
    "xray": "xray",
    "alfa": "alpha",       
    "sulu": "zulu",        
    "zoulou": "zulu", 
    "oskar": "oscar",
    "maik": "mike",
    "meik": "mike",
    "mic": "mike",     
}

NATO_KEYS = list(NATO_CANON.keys())

def normalize_nato_word(raw: str, min_score: int = 85):
    """
    Normalisiert ein transkribiertes NATO-Wort auf einen canonical key (z.B. 'whiskey' -> 'whisky').
    Fällt zurück auf fuzzy matching, wenn kein Alias passt.
    """
    if not raw:
        return None

    w = raw.lower().strip()
    # Nur Buchstaben/Leerzeichen/Bindestrich behalten (für x-ray, x ray, etc.)
    w = re.sub(r"[^a-z\- ]+", "", w)
    w = re.sub(r"\s+", " ", w).strip()

    # 1) Harte Aliases
    if w in NATO_ALIASES:
        w = NATO_ALIASES[w]
        return w

    # 2) Direkt korrekt
    if w in NATO_CANON:
        return w

    # 3) Fuzzy Matching
    match = process.extractOne(w, NATO_KEYS, scorer=fuzz.WRatio)
    if match and match[1] >= min_score:
        return match[0]

    return None

# A) Zahlwörter für Fragenummern
NUM_WORDS_Q = {
    # 0-19
    "null": "0",
    "eins": "1",
    "zwei": "2",
    "drei": "3",
    "vier": "4",
    "fünf": "5",
    "sechs": "6",
    "sieben": "7",
    "acht": "8",
    "neun": "9",
    "zehn": "10",
    "elf": "11",
    "zwölf": "12",
    "dreizehn": "13",
    "sechzig": "60",
    "einundsechzig": "61",
    "zweiundsechzig": "62",
}

# B) Zahlwörter für Code-Ziffernfolgen: nur 0-9 (damit du nicht ungewollt Fließtext veränderst)
NUM_WORDS_DIGITS = {
    "null": "0",
    "eins": "1",
    "zwei": "2",
    "drei": "3",
    "vier": "4",
    "fünf": "5",
    "sechs": "6",
    "sieben": "7",
    "acht": "8",
    "neun": "9",
}

def normalize_question_numbers_de(t: str) -> str:
    """
    Normalisiert 'Frage <zahlwort>' -> 'Frage <zahl>' (inkl. bis 62).
    """
    # z.B. "Frage Nummer fünf" / "Frage Nr. fünf" / "Frage fünf"
    pattern = re.compile(r'(?i)\bFrage(?:\s+Nr\.?|\s+Nummer)?\s+([A-Za-zäöüÄÖÜß]+)\b')

    def repl(m):
        w = m.group(1).lower()
        return f"Frage {NUM_WORDS_Q.get(w, w)}"

    return pattern.sub(repl, t)

def normalize_digit_sequences_de(t: str) -> str:
    """
    Ersetzt isolierte Zahlwörter 0-9 durch Ziffern.
    Das hilft v.a. bei 6-stelligen Code-Folgen, die als 'eins zwei drei ...' eingesprochen werden.
    """
    keys = sorted(NUM_WORDS_DIGITS.keys(), key=len, reverse=True)
    pattern = re.compile(r'(?i)\b(' + "|".join(map(re.escape, keys)) + r')\b')

    def repl(m):
        return NUM_WORDS_DIGITS[m.group(1).lower()]

    return pattern.sub(repl, t)

def normalize_transcript_for_extraction(text: str) -> str:
    x = text
    x = normalize_question_numbers_de(x)
    x = normalize_digit_sequences_de(x)
    return x

# === Erwartete Fragen ===
expected_markers = [
    "Frage 5", "Frage 6", "Frage 7", "Frage 8", "Frage 9", 
    "Frage 10", "Frage 11", "Frage 12", "Frage 13", 
    "Frage 40", "Frage 60", "Frage 61", "Frage 62"
]

def clean_answer(answer: str) -> str:
    answer = re.sub(r'(?i)^\s*hier folgt die antwort auf\s*', '', answer)
    answer = re.sub(r'(?i)\s*hier folgt die antwort auf\s*$', '', answer)
    return answer.strip(" .,-")

def robust_extract_questions(text: str, expected_markers: list) -> dict:
    marker_pattern = re.compile(r'(Frage\s*\d+)', re.IGNORECASE)
    matches = list(marker_pattern.finditer(text))
    answers = {marker: None for marker in expected_markers}

    def normalize_marker(m):
        return re.sub(r'\s+', ' ', m.group()).strip().capitalize()
    
    extracted = []
    for i, m in enumerate(matches):
        marker_text = normalize_marker(m)
        start_index = m.end()
        end_index = matches[i + 1].start() if i + 1 < len(matches) else len(text)
        extracted.append((marker_text, start_index, end_index))

    # NEU: kein break -> letzte Version überschreibt frühere
    for exp_marker in expected_markers:
        for marker_text, start, end in extracted:
            if marker_text.lower() == exp_marker.lower():
                answer = clean_answer(text[start:end].strip())
                answers[exp_marker] = answer
                # kein break -> spätere Treffer überschreiben frühere
    return answers

# === Alle neuen Audio-Dateien sammeln ===
audio_files = []
for root, _, files in os.walk(raw_audio_path):
    for f in files:
        if f.lower().endswith(('.wav', '.mp3', '.m4a', '.flac')) and not f.startswith('._'):
            full_path = os.path.abspath(os.path.join(root, f)).replace("\\", "/")  # Normalisieren
            if full_path not in processed_files:
                audio_files.append(full_path)

print(f"{len(audio_files)} neue Dateien gefunden.")

# === Falls schon CSV existiert, laden ===
if os.path.exists(output_csv):
    df = pd.read_csv(output_csv)
    data = df.to_dict("records")
else:
    data = []

# === Neue Dateien verarbeiten ===
for audio_path in audio_files:
    audio_path = os.path.abspath(audio_path).replace("\\", "/")

    # print(f"Processing: {audio_path}")  # Un-kommentieren, um Pfad sichtbar zu machen

    # --- Transkription: WAV direkt mit librosa laden ---
    audio, sr = librosa.load(audio_path, sr=16000)  # resample auf 16 kHz
    result = model.transcribe(audio, language="de")
    text = result['text']

    # --- Normalisierung für robuste Extraktion (Zahlwörter etc.)
    text_norm = normalize_transcript_for_extraction(text)

    # --- Chiffre-Extraktion robust (NATO Aliases + Fuzzy)

    # NATO-Wort (auch x-ray / x ray) + 6 Ziffern (mit Leerzeichen/Kommas erlaubt)
    code_match = re.search(r'\b([A-Za-zÄÖÜäöüß\- ]+?)\s*[:;,]?\s*([0-9][0-9\s,\.\-\/]{4,}[0-9])\b', text_norm)
    code = None
    if code_match:
        nato_raw = code_match.group(1)
        digit_blob = code_match.group(2)

        # Alle Nicht-Ziffern raus => nur digits
        digits = re.sub(r'\D+', '', digit_blob)

        nato_norm = normalize_nato_word(nato_raw)
        if nato_norm and len(digits) == 6:
            code = f"{NATO_CANON[nato_norm]}{digits}"

    # --- Fragen-Extraktion (auf normalisiertem Text) ---
    question_answers = robust_extract_questions(text_norm, expected_markers)
    # Ergebnis speichern
    entry = {"file_path": audio_path, "code": code, "full_transcript": text}
    entry.update({f"q{m.split()[1]}": question_answers.get(m) for m in expected_markers})
    data.append(entry)

    # --- CSV aktualisieren ---
    df = pd.DataFrame(data)
    df.to_csv(output_csv, index=False, encoding="utf-8")

    # --- Log aktualisieren ---
    with open(processed_log_path, "a", encoding="utf-8") as log:
        log.write(audio_path + "\n")

# === Abschlussmeldung ===
print("Fertig mit der Verarbeitung aller neuen Dateien.")