In [2]:
import os
import csv

# Folder with your .txt transcriptions
txt_dir = r"C:\PY\Internship\mcv-spontaneous-de-v1.0\sps-corpus-1.0-2025-09-05-de\audios\transcriptions"

# Output CSV path (in the same folder)
csv_path = os.path.join(txt_dir, "transcriptions.csv")

rows = []

for fname in sorted(os.listdir(txt_dir)):
    if not fname.lower().endswith(".txt"):
        continue

    file_id = os.path.splitext(fname)[0]  # filename without extension
    file_path = os.path.join(txt_dir, fname)

    with open(file_path, "r", encoding="utf-8") as f:
        text = f.read().strip()

    rows.append({"id": file_id, "text": text})

# Write CSV
with open(csv_path, "w", encoding="utf-8", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=["id", "text"])
    writer.writeheader()
    writer.writerows(rows)

print(f"Wrote {len(rows)} rows to {csv_path}")


Wrote 16 rows to C:\PY\Internship\mcv-spontaneous-de-v1.0\sps-corpus-1.0-2025-09-05-de\audios\transcriptions\transcriptions.csv


In [3]:
import os
import re

folder = r"C:\PY\Internship\mcv-spontaneous-de-v1.0\sps-corpus-1.0-2025-09-05-de\audios"

# digits at the end of the basename, e.g. "spontaneous-speech-de-71030" -> "71030"
pattern = re.compile(r"^(?:.*?)-(\d+)$")

for fname in os.listdir(folder):
    if not fname.lower().endswith(".mp3"):
        continue  # skip non-mp3 files

    old_path = os.path.join(folder, fname)
    if not os.path.isfile(old_path):
        continue

    base, ext = os.path.splitext(fname)
    m = pattern.match(base)
    if not m:
        print("Skipping (no trailing digits pattern):", fname)
        continue

    new_base = m.group(1)          # the digits at the end
    new_name = new_base + ext      # e.g. "71030.mp3"
    new_path = os.path.join(folder, new_name)

    if os.path.exists(new_path):
        print(f"Skipping {fname} -> {new_name}, target already exists")
        continue

    os.rename(old_path, new_path)
    print(f"Renamed {fname} -> {new_name}")


Renamed spontaneous-speech-de-71030.mp3 -> 71030.mp3
Renamed spontaneous-speech-de-71096.mp3 -> 71096.mp3
Renamed spontaneous-speech-de-71253.mp3 -> 71253.mp3
Renamed spontaneous-speech-de-71254.mp3 -> 71254.mp3
Renamed spontaneous-speech-de-71255.mp3 -> 71255.mp3
Renamed spontaneous-speech-de-71256.mp3 -> 71256.mp3
Renamed spontaneous-speech-de-71257.mp3 -> 71257.mp3
Renamed spontaneous-speech-de-71258.mp3 -> 71258.mp3
Renamed spontaneous-speech-de-71259.mp3 -> 71259.mp3
Renamed spontaneous-speech-de-71260.mp3 -> 71260.mp3
Renamed spontaneous-speech-de-71261.mp3 -> 71261.mp3
Renamed spontaneous-speech-de-71262.mp3 -> 71262.mp3
Renamed spontaneous-speech-de-71263.mp3 -> 71263.mp3
Renamed spontaneous-speech-de-71264.mp3 -> 71264.mp3
Renamed spontaneous-speech-de-71265.mp3 -> 71265.mp3
Renamed spontaneous-speech-de-71266.mp3 -> 71266.mp3
Renamed spontaneous-speech-de-71267.mp3 -> 71267.mp3
Renamed spontaneous-speech-de-71268.mp3 -> 71268.mp3
Renamed spontaneous-speech-de-71269.mp3 -> 712

In [3]:
import os
import torch
import whisperx
import csv

# === 1) PATHS ===
audio_dir = r"C:\PY\Internship\mcv-spontaneous-de-v1.0\sps-corpus-1.0-2025-09-05-de\audios"
base_out_dir = r"C:\PY\Internship"
os.makedirs(base_out_dir, exist_ok=True)

out_csv = os.path.join(base_out_dir, "transcriptions.csv")

# === 2) LOAD EXISTING IDS (IF CSV ALREADY EXISTS) ===
existing_ids = set()
append_mode = False

if os.path.exists(out_csv):
    print(f"Found existing CSV at {out_csv}, loading IDs...")
    with open(out_csv, "r", encoding="utf-8", newline="") as f:
        reader = csv.DictReader(f)
        for row in reader:
            if "id" in row:
                existing_ids.add(row["id"])
    append_mode = True
    print(f"Loaded {len(existing_ids)} existing IDs.")
else:
    print("No existing CSV found. Will create a new one.")
    append_mode = False

# === 3) WHISPERX MODEL SETUP ===
device = "cuda" if torch.cuda.is_available() else "cpu"
batch_size = 16  # reduce if needed
compute_type = "float16" if device == "cuda" else "float32"

print(f"Using device: {device}, compute_type: {compute_type}")
model = whisperx.load_model("large-v2", device, compute_type=compute_type)

# === 4) OPEN CSV (APPEND OR CREATE) AND PROCESS MP3s ===
mode = "a" if append_mode else "w"
with open(out_csv, mode, encoding="utf-8", newline="") as f:
    writer = csv.writer(f)

    # If we're creating a new file, write header
    if not append_mode:
        writer.writerow(["id", "transcription"])

    # Loop over all MP3s
    for fname in os.listdir(audio_dir):
        if not fname.lower().endswith(".mp3"):
            continue

        audio_path = os.path.join(audio_dir, fname)

        # Filename is just the ID now: e.g. "71030.mp3" -> "71030"
        id_part = os.path.splitext(fname)[0]

        # Skip if this ID is already in the CSV
        if id_part in existing_ids:
            print(f"[SKIP] {fname} (ID {id_part} already in CSV)")
            continue

        print(f"[TRANSCRIBE] {fname} (ID: {id_part})")

        # Transcribe with WhisperX
        result = model.transcribe(audio_path, batch_size=batch_size)

        # Combine segment texts into one string
        full_text = " ".join(seg["text"].strip() for seg in result["segments"])

        # Write new row to CSV
        writer.writerow([id_part, full_text])
        f.flush()  # make sure it's written

        # Remember that we've now processed this ID
        existing_ids.add(id_part)

print("Done. Incremental transcription finished.")


Found existing CSV at C:\PY\Internship\transcriptions.csv, loading IDs...
Loaded 16 existing IDs.
Using device: cpu, compute_type: float32


  torchaudio.list_audio_backends()
  available_backends = torchaudio.list_audio_backends()


2025-11-19 13:53:03 - whisperx.asr - INFO - No language specified, language will be detected for each audio file (increases inference time)
2025-11-19 13:53:03 - whisperx.vads.pyannote - INFO - Performing voice activity detection using Pyannote...


  if ismodule(module) and hasattr(module, '__file__'):
Lightning automatically upgraded your loaded checkpoint from v1.5.4 to v2.5.6. To apply the upgrade to your files permanently, run `python -m pytorch_lightning.utilities.upgrade_checkpoint C:\Users\david\AppData\Roaming\Python\Python313\site-packages\whisperx\assets\pytorch_model.bin`
  torchaudio.list_audio_backends()


Model was trained with pyannote.audio 0.0.1, yours is 3.4.0. Bad things might happen unless you revert pyannote.audio to 0.x.
Model was trained with torch 1.10.0+cu102, yours is 2.8.0+cpu. Bad things might happen unless you revert torch to 1.x.
[SKIP] 71030.mp3 (ID 71030 already in CSV)
[SKIP] 71096.mp3 (ID 71096 already in CSV)
[SKIP] 71253.mp3 (ID 71253 already in CSV)
[SKIP] 71254.mp3 (ID 71254 already in CSV)
[SKIP] 71255.mp3 (ID 71255 already in CSV)
[SKIP] 71256.mp3 (ID 71256 already in CSV)
[SKIP] 71257.mp3 (ID 71257 already in CSV)
[SKIP] 71258.mp3 (ID 71258 already in CSV)
[SKIP] 71259.mp3 (ID 71259 already in CSV)
[SKIP] 71260.mp3 (ID 71260 already in CSV)
[SKIP] 71261.mp3 (ID 71261 already in CSV)
[SKIP] 71262.mp3 (ID 71262 already in CSV)
[SKIP] 71263.mp3 (ID 71263 already in CSV)
[SKIP] 71264.mp3 (ID 71264 already in CSV)
[SKIP] 71265.mp3 (ID 71265 already in CSV)
[SKIP] 71266.mp3 (ID 71266 already in CSV)
[TRANSCRIBE] 71267.mp3 (ID: 71267)
2025-11-19 13:53:43 - whisperx.a

In [None]:
import csv
from phonemizer import phonemize
from phonemizer.separator import Separator
from phonemizer.backend.espeak.wrapper import EspeakWrapper

# Point to your eSpeak NG DLL
EspeakWrapper.set_library(r"C:\Program Files\eSpeak NG\libespeak-ng.dll")

# Input and output paths
in_csv = r"C:\PY\Internship\transcriptions.csv"
out_csv = r"C:\PY\Internship\transcriptions_ipa.csv"

rows = []
texts = []

# --- 1) Read existing CSV ---
with open(in_csv, "r", encoding="utf-8", newline="") as f:
    reader = csv.DictReader(f)
    fieldnames = reader.fieldnames or []
    if "text" not in fieldnames:
        raise ValueError("Expected a 'text' column in the CSV.")

    for row in reader:
        rows.append(row)
        texts.append(row["text"])

print(f"Loaded {len(rows)} rows from {in_csv}")

# --- 2) Phonemize all texts in one go ---
# If you *don’t* want word separators, use: Separator(phone=' ', word=' ')
separator = Separator(phone=' ', word='|')

ipas = phonemize(
    texts,
    language="de",          # German
    backend="espeak",       # via eSpeak NG
    separator=separator,
    strip=True,
    preserve_punctuation=False,
    with_stress=False,       # set False if you don’t want ˈ stress marks
    njobs=1
)

# --- 3) Attach clean IPA into a single 'ipa' column ---
for row, ipa in zip(rows, ipas):
    row["ipa"] = ipa  # overwrite any existing ipa

# --- 4) Decide header: reuse existing columns, add ipa if missing ---
if "ipa" in fieldnames:
    out_fieldnames = fieldnames
else:
    out_fieldnames = fieldnames + ["ipa"]

with open(out_csv, "w", encoding="utf-8", newline="") as f:
    writer = csv.DictWriter(f, fieldnames=out_fieldnames)
    writer.writeheader()
    writer.writerows(rows)

print(f"Wrote IPA-augmented CSV to {out_csv}")


Loaded 35 rows from C:\PY\Internship\transcriptions.csv
Wrote IPA-augmented CSV to C:\PY\Internship\transcriptions_ipa.csv
