# Convert Parquet â†’ WAV (16 kHz, mono) per data-prep-plan-ru.md

- Source: `Thorsten-Voice/TV-44kHz-Full` (HF streaming).
- Output: WAV 16 kHz mono + peak normalization; optional silence trim.
- Saves to `data_wav/` and writes updated metadata (CSV/Parquet) with WAV paths.
- Idempotent: skips existing WAVs and resumes from checkpoint.

In [20]:
!pip install -q --upgrade datasets soundfile pandas tqdm pyarrow librosa

In [21]:
from pathlib import Path
import io
import json
import numpy as np
import pandas as pd
import soundfile as sf
import librosa
from datasets import Audio, load_dataset
from tqdm.auto import tqdm

# Parameters
REPO_ID = "Thorsten-Voice/TV-44kHz-Full"
SPLIT = "train"
TARGET_SR = 16_000
TARGET_PEAK = 0.98  # peak normalize to 98% full scale
TRIM_SILENCE = False  # set True to trim leading/trailing silence by simple energy threshold
TRIM_THRESHOLD = 0.001  # silence threshold if TRIM_SILENCE=True
CHUNK_ROWS = 500  # how often to write metadata and checkpoint

BASE_DIR = Path.cwd()
SRC_META_CSV = BASE_DIR / "data_tv_44khz_full" / "tv_44khz_full_metadata.csv"
WAV_DIR = BASE_DIR / "data_wav"
WAV_META_CSV = WAV_DIR / "tv_44khz_full_metadata_wav.csv"
WAV_META_PARQUET = WAV_DIR / "tv_44khz_full_metadata_wav.parquet"
CHECKPOINT = WAV_DIR / "wav_checkpoint.json"

WAV_DIR.mkdir(parents=True, exist_ok=True)

print("BASE_DIR", BASE_DIR)
print("SRC_META_CSV exists", SRC_META_CSV.exists())

BASE_DIR /Volumes/SSanDisk/SpeechRec-German
SRC_META_CSV exists True


In [22]:
# Checkpoint helpers
def load_checkpoint():
    if CHECKPOINT.exists():
        try:
            return int(json.loads(CHECKPOINT.read_text()).get("last_index", -1))
        except Exception:
            return -1
    return -1

def save_checkpoint(last_index: int):
    CHECKPOINT.write_text(json.dumps({"last_index": int(last_index)}), encoding="utf-8")

# Audio transforms
def to_mono(audio_np: np.ndarray):
    if audio_np.ndim == 1:
        return audio_np
    return audio_np.mean(axis=0) if audio_np.shape[0] < audio_np.shape[1] else audio_np.mean(axis=1)

def normalize_peak(audio_np: np.ndarray, target_peak: float = TARGET_PEAK):
    peak = np.max(np.abs(audio_np)) if audio_np.size else 0.0
    if peak > 0:
        audio_np = audio_np * (target_peak / peak)
    return np.clip(audio_np, -1.0, 1.0)

def trim_silence(audio_np: np.ndarray, threshold: float = TRIM_THRESHOLD):
    if not TRIM_SILENCE or audio_np.size == 0:
        return audio_np
    abs_sig = np.abs(audio_np)
    mask = abs_sig > threshold
    if not mask.any():
        return audio_np  # all silence, return as-is
    idx = np.where(mask)[0]
    return audio_np[idx[0]: idx[-1] + 1]

def prepare_audio(audio_np: np.ndarray):
    mono = to_mono(audio_np)
    trimmed = trim_silence(mono)
    return normalize_peak(trimmed)

# Metadata row builder
def meta_row(idx: int, row: dict, wav_path: Path):
    m = {
        "idx": idx,
        "id": row.get("id"),
        "subset": row.get("subset"),
        "style": row.get("style"),
        "text": row.get("text"),
        "samplerate": TARGET_SR,
        "durationSeconds": row.get("durationSeconds"),
        "recording_year_month": row.get("recording_year-month"),
        "microphone": row.get("microphone"),
        "language": row.get("language"),
        "comment": row.get("comment"),
        "audio_wav_path": str(wav_path),
    }
    return m

In [23]:
# Determine resume point
start_from_cp = load_checkpoint() + 1
start_from_meta = 0
if WAV_META_CSV.exists():
    try:
        start_from_meta = sum(1 for _ in open(WAV_META_CSV, "r", encoding="utf-8")) - 1  # minus header
    except Exception:
        start_from_meta = 0
start_from = max(start_from_cp, start_from_meta, 0)
print(f"Resuming from index: {start_from}")

Resuming from index: 0


In [24]:
# Stream dataset audio without auto-decoding (decode=False)
ds = load_dataset(REPO_ID, name="all", split=SPLIT, streaming=True)
ds = ds.cast_column("audio", Audio(decode=False))

buffer = []
last_index = start_from - 1
progress = tqdm(ds, desc="parquet->wav", initial=start_from)

for idx, row in enumerate(progress):
    if idx < start_from:
        continue

    subset = row.get("subset", "unknown")
    out_path = WAV_DIR / subset / f"{row['id']}.wav"
    if out_path.exists():
        buffer.append(meta_row(idx, row, out_path))
    else:
        audio_bytes = row["audio"].get("bytes")
        if audio_bytes is None:
            # if only a path is provided (streaming), read bytes from it
            with open(row["audio"]["path"], "rb") as fh:
                audio_bytes = fh.read()
        data, sr = sf.read(io.BytesIO(audio_bytes), dtype="float32")
        if data.ndim == 1:
            audio_arr = data
        else:
            audio_arr = data.mean(axis=1)
        if sr != TARGET_SR:
            audio_arr = librosa.resample(audio_arr, orig_sr=sr, target_sr=TARGET_SR)
        audio_arr = prepare_audio(np.asarray(audio_arr))
        out_path.parent.mkdir(parents=True, exist_ok=True)
        sf.write(out_path, audio_arr, TARGET_SR)
        buffer.append(meta_row(idx, row, out_path))

    last_index = idx
    if len(buffer) >= CHUNK_ROWS:
        header = not WAV_META_CSV.exists()
        pd.DataFrame(buffer).to_csv(WAV_META_CSV, mode="a", header=header, index=False)
        buffer.clear()
        save_checkpoint(last_index)

# final flush
if buffer:
    header = not WAV_META_CSV.exists()
    pd.DataFrame(buffer).to_csv(WAV_META_CSV, mode="a", header=header, index=False)
    buffer.clear()

save_checkpoint(last_index)
print(f"Done. Last index: {last_index}")

Some datasets params were ignored: ['homepage', 'license']. Make sure to use only valid params for the dataset builder and to have a up-to-date version of the `datasets` library.
parquet->wav: 39248it [16:12, 40.37it/s] 

Done. Last index: 39247





In [25]:
# Export Parquet and keep a copy of source metadata
if WAV_META_CSV.exists():
    df = pd.read_csv(WAV_META_CSV)
    df.to_parquet(WAV_META_PARQUET, index=False)
    print(f"Wrote {len(df)} rows to {WAV_META_PARQUET}")

# Copy original metadata into data_wav (for comparison)
if SRC_META_CSV.exists():
    dst = WAV_DIR / SRC_META_CSV.name
    if not dst.exists():
        dst.write_bytes(SRC_META_CSV.read_bytes())
        print(f"Copied source metadata to {dst}")

Wrote 39248 rows to /Volumes/SSanDisk/SpeechRec-German/data_wav/tv_44khz_full_metadata_wav.parquet
Copied source metadata to /Volumes/SSanDisk/SpeechRec-German/data_wav/tv_44khz_full_metadata.csv
