# Runpod: HF → Canary (NeMo) — PyTorch 2.8 / CUDA 12.8

Блокнот для Runpod / PyTorch 2.8 / CUDA 12.8. Без Parquet: экспорт WAV 16 kHz mono + JSONL. Исправлен импорт NeMo: перед установкой/импортом nemo_toolkit[asr] выравниваем NumPy/SciPy/Lightning/TorchMetrics.


# Проверка окружения


In [None]:
import os, sys, subprocess, platform
import torch

print("Python :", sys.version)
print("Platform:", platform.platform())
print("Torch  :", torch.__version__, "| CUDA:", torch.version.cuda)
print("CUDA visible:", os.environ.get("CUDA_VISIBLE_DEVICES"))
try:
    subprocess.run(["nvidia-smi"], check=True)
except Exception as e:
    print("[WARN] nvidia-smi not available:", e)


# Конфиг


In [None]:
from pathlib import Path
import os, re

# --- Ключ HF: вставь в переменные окружения контейнера ---
# os.environ["HF_TOKEN"] = "hf_XXXXXXXXXXXXXXXXXXXXXXXXXXXX"
# os.environ["HUGGINGFACE_HUB_TOKEN"] = os.environ.get("HF_TOKEN","")

ROOT = Path.cwd()
DATA_DIR    = ROOT / "data_wav"
OUT_DIR     = ROOT / "filtered_datasets"
TMP_DIR     = ROOT / ".tmp"
HF_HOME_DIR = ROOT / ".hf"

for d in [DATA_DIR, OUT_DIR, TMP_DIR, HF_HOME_DIR]:
    d.mkdir(parents=True, exist_ok=True)

os.environ.setdefault("HF_HOME", str(HF_HOME_DIR))
os.environ.setdefault("HF_HUB_CACHE", str(HF_HOME_DIR / "hub"))
os.environ.setdefault("TRANSFORMERS_CACHE", str(HF_HOME_DIR / "transformers"))
os.environ.setdefault("TMP", str(TMP_DIR))
os.environ.setdefault("TEMP", str(TMP_DIR))

MODEL_ID    = "nvidia/canary-1b-v2"
NEMO_PATH   = None
SOURCE_LANG = "ru"
TARGET_LANG = "ru"
TASK        = "asr"
USE_PNC     = True
BATCH_SIZE  = 16

MIN_DUR, MAX_DUR = 1.0, 35.0
CER_MAX, WER_MAX = 0.15, 0.50
Q_CORE, Q_HARD   = 0.60, 0.95

LINKS = [
    "bond005/taiga_speech_v2",
    "bond005/rulibrispeech",
    "bond005/podlodka_speech",
    "bond005/audioset-nonspeech",
    "mozilla-foundation/common_voice_17_0",
    "google/fleurs",
]

SPLITS = ["train", "validation", "test"]
RUPAT  = re.compile(r"(^|[-_])ru([-_]|$)|russian", re.IGNORECASE)


# Установка базовых зависимостей (без переустановки torch)


In [None]:
import sys, subprocess

def pip_install(args):
    cmd = [sys.executable, "-m", "pip", "install", "-U", *args]
    print("+", " ".join(cmd))
    subprocess.run(cmd, check=True)

base = [
    "datasets[audio]>=2.20.0",
    "huggingface_hub>=0.24",
    "soundfile>=0.12",
    "pydub>=0.25",
    "jiwer>=3.0.0",
    "pandas>=2.2.0",
    "tqdm>=4.66"
]
pip_install(base)

print("OK: базовые пакеты установлены")


# FIX: стек NumPy/SciPy/Lightning/TorchMetrics + установка NeMo


In [None]:
import sys, subprocess

def run(cmd):
    print("+", " ".join(cmd))
    subprocess.run(cmd, check=True)

run([sys.executable, "-m", "pip", "install", "-U", "pip>=24.2", "setuptools>=75", "wheel>=0.44"])

run([sys.executable, "-m", "pip", "install", "-U",
     "numpy>=2.2,<2.4",
     "scipy>=1.14,<1.17",
     "torchmetrics>=1.5.2",
     "lightning>=2.5.2,<2.6"
])

# Install NeMo if missing
try:
    from nemo.collections.asr.models import ASRModel
    import nemo
    print("NeMo already present:", nemo.__version__)
except Exception:
    run([sys.executable, "-m", "pip", "install", "-U", "nemo_toolkit[asr]==2.4.0"])
    from nemo.collections.asr.models import ASRModel
    import nemo

import numpy, scipy, lightning, torchmetrics, nemo as _nemo
print("NumPy      :", numpy.__version__)
print("SciPy      :", scipy.__version__)
print("Lightning  :", lightning.__version__)
print("TorchMetrics:", torchmetrics.__version__)
print("NeMo       :", _nemo.__version__)

# shim for `import numpy.char` if some dep expects it
import types, sys as _sys
if "numpy.char" not in _sys.modules:
    import numpy as _np
    _mod = types.ModuleType("numpy.char")
    for k in dir(_np.char):
        setattr(_mod, k, getattr(_np.char, k))
    _sys.modules["numpy.char"] = _mod
    print("[shim] injected numpy.char module")

print("OK: NeMo стек готов")


# Хелперы: WAV + JSONL


In [None]:
import os, io, json, hashlib
from pathlib import Path
import numpy as np
import soundfile as sf
from datasets import load_dataset, Audio
from tqdm import tqdm

def sha1_name(s: str) -> str:
    return hashlib.sha1(s.encode("utf-8", errors="ignore")).hexdigest()

def ensure_wav_mono16k(data, sr):
    # returns np.float32 mono at 16k sampling rate
    import numpy as np
    if getattr(data, "ndim", 1) > 1:
        data = data.mean(axis=1)
    data = np.asarray(data, dtype=np.float32)
    if sr != 16000:
        try:
            import librosa
            data = librosa.resample(y=data, orig_sr=sr, target_sr=16000)
            sr = 16000
        except Exception:
            raise RuntimeError("Need resample to 16k but librosa not available")
    return data, 16000

def save_wav(path: Path, data, sr: int):
    path.parent.mkdir(parents=True, exist_ok=True)
    sf.write(str(path), data, sr, subtype="PCM_16", format="WAV")

def prepare_hf_dataset_to_wav(repo: str, split: str, out_root: Path, lang_regex, hf_token=None):
    kwargs = {}
    if hf_token:
        kwargs["token"] = hf_token
    try:
        ds = load_dataset(repo, split=split, streaming=False, **kwargs)
    except Exception as e:
        print(f"[skip] {repo}:{split} → {e}")
        return None

    try:
        ds = ds.cast_column("audio", Audio(sampling_rate=16000))
    except Exception:
        pass

    name = f"{repo.replace('/','___')}_{split}"
    out_dir = out_root / name
    audio_dir = out_dir / "audio"
    out_dir.mkdir(parents=True, exist_ok=True)
    manifest = out_dir / "manifest.jsonl"

    kept = 0
    with manifest.open("w", encoding="utf-8") as fo:
        for i, row in enumerate(tqdm(ds, desc=f"{repo}:{split}")):
            text = None
            for key in ["text","sentence","transcript","transcription","label","target"]:
                if key in row and row[key]:
                    text = str(row[key]).strip()
                    break
            if not text:
                continue

            lang_val = None
            for lkey in ["lang","language","source_lang","locale"]:
                if lkey in row and row[lkey]:
                    lang_val = str(row[lkey]).lower()
                    break
            if lang_val and not lang_regex.search(lang_val):
                continue

            audio = row.get("audio")
            if not audio:
                continue

            arr = audio["array"]
            sr  = audio["sampling_rate"]
            try:
                arr, sr = ensure_wav_mono16k(np.asarray(arr), int(sr))
            except Exception:
                continue

            dur = float(len(arr) / sr)
            if not (MIN_DUR <= dur <= MAX_DUR):
                continue

            wav_path = audio_dir / f"{sha1_name(name+'_'+str(i))}.wav"
            save_wav(wav_path, arr, sr)

            item = {"audio_filepath": str(wav_path), "text": text, "duration": dur}
            fo.write(json.dumps(item, ensure_ascii=False) + "\n")
            kept += 1

    print(f"[OK] {name}: {kept} записей")
    return {"name": name, "manifest": str(manifest), "dir": str(out_dir), "kept": kept}


# Подготовка датасетов (WAV + JSONL)


In [None]:
import os
HF_TOKEN = os.environ.get("HF_TOKEN") or os.environ.get("HUGGINGFACE_HUB_TOKEN")

prepared = []
for repo in LINKS:
    for split in SPLITS:
        meta = prepare_hf_dataset_to_wav(repo, split, DATA_DIR, RUPAT, hf_token=HF_TOKEN)
        if meta and meta["kept"] > 0:
            prepared.append(meta)

print("Prepared:", len(prepared), "splits")
for m in prepared[:5]:
    print(" -", m["name"], "→", m["kept"])


# Инференс Canary + фильтрация + экспорт core/hard


In [None]:
import json, pandas as pd
from jiwer import wer, cer
from pathlib import Path
from nemo.collections.asr.models import ASRModel

def load_canary(model_id: str, nemo_path: str | None):
    if nemo_path:
        return ASRModel.restore_from(nemo_path, map_location="cuda").eval()
    return ASRModel.from_pretrained(model_name=model_id).eval()

model = load_canary(MODEL_ID, NEMO_PATH)

def transcribe_paths(model, paths, batch_size, source_lang, target_lang, task, pnc):
    results = {}
    bs = max(1, int(batch_size))
    import torch, gc
    for i in range(0, len(paths), bs):
        batch = paths[i:i+bs]
        hyps = model.transcribe(batch, batch_size=bs,
                                source_lang=source_lang, target_lang=target_lang,
                                task=task, pnc=pnc)
        for p, h in zip(batch, hyps):
            if isinstance(h, str): results[p] = h
            elif isinstance(h, dict): results[p] = h.get("text") or h.get("pred_text") or str(h)
            else: results[p] = str(h)
        try:
            torch.cuda.empty_cache(); torch.cuda.ipc_collect()
        except Exception:
            pass
        gc.collect()
    return results

def compute_metrics(items, preds):
    out = []
    for it in items:
        a = it["audio_filepath"]; ref = it["text"]; d = it["duration"]
        hyp = preds.get(a, "")
        if not hyp: 
            continue
        out.append({
            "audio": a, "ref": ref, "hyp": hyp, "dur": float(d),
            "wer": float(wer(ref, hyp)), "cer": float(cer(ref, hyp))
        })
    return out

def split_by_quantiles(items, q_core, q_hard):
    if not items: 
        return [], [], {"q_core_val":0.0,"q_hard_val":0.0}
    s = pd.Series([it["wer"] for it in items], dtype=float)
    q_core_val = float(s.quantile(q_core)); q_hard_val = float(s.quantile(q_hard))
    core = [it for it in items if it["wer"] <= q_core_val]
    hard = [it for it in items if (it["wer"] > q_core_val) and (it["wer"] <= q_hard_val)]
    return core, hard, {"q_core_val": q_core_val, "q_hard_val": q_hard_val}

def export_bucket(bucket_name: str, items, ds_dir: Path):
    out_dir = ds_dir / bucket_name
    audio_dir = out_dir / "audio"
    out_dir.mkdir(parents=True, exist_ok=True); audio_dir.mkdir(parents=True, exist_ok=True)
    manifest = out_dir / "manifest.jsonl"
    kept = 0
    with manifest.open("w", encoding="utf-8") as fo:
        for it in items:
            row = {"audio_filepath": it["audio"], "text": it["ref"], "pred_text": it["hyp"],
                   "wer": it["wer"], "cer": it["cer"], "duration": it["dur"]}
            fo.write(json.dumps(row, ensure_ascii=False) + "\n"); kept += 1
    return kept, manifest

summaries = []

for meta in prepared:
    man_path = Path(meta["manifest"])
    ds_dir   = Path(meta["dir"])
    with man_path.open("r", encoding="utf-8") as f:
        items = [json.loads(x) for x in f]

    uniq_paths = list({it["audio_filepath"] for it in items})
    preds = transcribe_paths(model, uniq_paths, BATCH_SIZE, SOURCE_LANG, TARGET_LANG, TASK, USE_PNC)

    metrics_all = compute_metrics(items, preds)
    pool = [m for m in metrics_all if (m["cer"] <= CER_MAX and m["wer"] <= WER_MAX)]
    core_items, hard_items, qvals = split_by_quantiles(pool, Q_CORE, Q_HARD)

    hard_kept, hard_manifest = export_bucket("hard", hard_items, ds_dir)
    core_kept, core_manifest = export_bucket("core", core_items, ds_dir)

    summary = {
        "dataset": meta["name"],
        "total": len(items), "pool": len(pool),
        "q_core": Q_CORE, "q_hard": Q_HARD,
        "q_core_val": qvals["q_core_val"], "q_hard_val": qvals["q_hard_val"],
        "core_selected": len(core_items), "core_saved": core_kept, "core_manifest": str(core_manifest),
        "hard_selected": len(hard_items), "hard_saved": hard_kept, "hard_manifest": str(hard_manifest),
        "params": {"min_dur": MIN_DUR, "max_dur": MAX_DUR, "cer_max": CER_MAX, "wer_max": WER_MAX,
                   "task": TASK, "pnc": USE_PNC, "source_lang": SOURCE_LANG, "target_lang": TARGET_LANG},
    }
    with (ds_dir / "summary.json").open("w", encoding="utf-8") as f:
        json.dump(summary, f, ensure_ascii=False, indent=2)
    summaries.append(summary)

import pandas as pd
df = pd.DataFrame(summaries)
df
