In [9]:
import urllib.request, zipfile, io, os
from pathlib import Path

# 저장 위치
VOSK_MODEL_ROOT = Path("/Users/leejeje/Desktop/DSL/25-1/Modeling/model")
VOSK_MODEL_ROOT.mkdir(parents=True, exist_ok=True)

# 한국어 소형 모델 (공식 목록에 기재된 이름)
MODEL_URL  = "https://alphacephei.com/vosk/models/vosk-model-small-ko-0.22.zip"  # 공식 모델 페이지에 등재. 
TARGET_DIR = VOSK_MODEL_ROOT / "vosk-model-small-ko-0.22"

if TARGET_DIR.exists() and any(TARGET_DIR.iterdir()):
    print("[skip] 이미 존재:", TARGET_DIR)
else:
    print("다운로드:", MODEL_URL)
    with urllib.request.urlopen(MODEL_URL) as resp:
        data = resp.read()
    with zipfile.ZipFile(io.BytesIO(data)) as zf:
        zf.extractall(VOSK_MODEL_ROOT)
    # 보통 zip 안에 동일한 폴더명이 들어있음
    print("압축 해제 완료 →", VOSK_MODEL_ROOT)
    
VOSK_MODEL_DIR = TARGET_DIR if TARGET_DIR.exists() else next(VOSK_MODEL_ROOT.glob("vosk-model-small-ko-0.22*"))
print("VOSK_MODEL_DIR =", VOSK_MODEL_DIR)


[skip] 이미 존재: /Users/leejeje/Desktop/DSL/25-1/Modeling/model/vosk-model-small-ko-0.22
VOSK_MODEL_DIR = /Users/leejeje/Desktop/DSL/25-1/Modeling/model/vosk-model-small-ko-0.22


In [10]:
from vosk import Model

assert VOSK_MODEL_DIR.exists(), f"모델 폴더가 없습니다: {VOSK_MODEL_DIR}"
vosk_model = Model(str(VOSK_MODEL_DIR))
print("✅ Vosk 모델 로드 완료:", VOSK_MODEL_DIR.name)


LOG (VoskAPI:ReadDataFiles():model.cc:213) Decoding params beam=10 max-active=3000 lattice-beam=2
LOG (VoskAPI:ReadDataFiles():model.cc:216) Silence phones 1:2:3:4:5:6:7:8:9:10
LOG (VoskAPI:RemoveOrphanNodes():nnet-nnet.cc:948) Removed 1 orphan nodes.
LOG (VoskAPI:RemoveOrphanComponents():nnet-nnet.cc:847) Removing 2 orphan components.
LOG (VoskAPI:Collapse():nnet-utils.cc:1488) Added 1 components, removed 2
LOG (VoskAPI:ReadDataFiles():model.cc:248) Loading i-vector extractor from /Users/leejeje/Desktop/DSL/25-1/Modeling/model/vosk-model-small-ko-0.22/ivector/final.ie
LOG (VoskAPI:ComputeDerivedVars():ivector-extractor.cc:183) Computing derived variables for iVector extractor
LOG (VoskAPI:ComputeDerivedVars():ivector-extractor.cc:204) Done.
LOG (VoskAPI:ReadDataFiles():model.cc:282) Loading HCL and G from /Users/leejeje/Desktop/DSL/25-1/Modeling/model/vosk-model-small-ko-0.22/graph/HCLr.fst /Users/leejeje/Desktop/DSL/25-1/Modeling/model/vosk-model-small-ko-0.22/graph/Gr.fst
LOG (VoskA

✅ Vosk 모델 로드 완료: vosk-model-small-ko-0.22


In [11]:
import numpy as np, soundfile as sf, json
from vosk import KaldiRecognizer

def _resample_to_16k(wave: np.ndarray, sr: int) -> np.ndarray:
    if sr == 16000:
        return wave.astype("float32", copy=False)
    x = np.arange(len(wave))
    new_len = int(round(len(wave) * 16000 / sr))
    new_x = np.linspace(0, len(wave)-1, new_len)
    return np.interp(new_x, x, wave).astype("float32")

def vosk_transcribe_file(wav_path, model: Model, set_words=False, chunk_ms=50) -> str:
    """wav 파일을 Vosk로 인식하여 1줄 텍스트 반환"""
    audio, sr = sf.read(str(wav_path), dtype="float32", always_2d=False)
    if audio.ndim == 2:
        audio = audio.mean(axis=1)
    audio = _resample_to_16k(audio, sr)  # 16k로 정규화
    sr = 16000

    # Vosk는 int16 PCM 바이트 입력을 기대 -> 변환
    pcm16 = (np.clip(audio, -1.0, 1.0) * 32767.0).astype(np.int16)

    rec = KaldiRecognizer(model, sr)
    if set_words:
        rec.SetWords(True)

    # 50ms 단위(기본)로 스트리밍 투입
    frames_per_chunk = int(sr * (chunk_ms / 1000.0))
    offset = 0
    n = len(pcm16)
    while offset < n:
        chunk = pcm16[offset:offset+frames_per_chunk]
        rec.AcceptWaveform(chunk.tobytes())
        offset += frames_per_chunk

    last = json.loads(rec.FinalResult() or "{}")
    return (last.get("text") or "").strip()


In [12]:
import os, re, time, unicodedata, csv
from pathlib import Path

RE_BRACKETS_ALL = re.compile(r"\([^)]*\)|\[[^\]]*\]|\{[^}]*\}|<[^>]*>")
RE_LETTER_SLASH = re.compile(r"(?<!\S)[a-zA-Z]+/\s*")  # 'o/' 'b/' 'l/' 등 토큰 통째 제거
RE_COLON_PREFIX = re.compile(r"^\s*:+\s*")             # 문두 ':: ' 제거
RE_PUNCT        = re.compile(r"[^\w\s]", flags=re.UNICODE)
RE_WS           = re.compile(r"\s+")


def u_nfc(s: str) -> str:
    return unicodedata.normalize("NFC", s)


def normalize_for_wer(s: str) -> str:
    s = u_nfc(s).lower()
    s = RE_COLON_PREFIX.sub(" ", s)       # :: 제거
    s = RE_BRACKETS_ALL.sub(" ", s)       # 괄호 주석 제거
    s = RE_LETTER_SLASH.sub(" ", s)       # 'o/' 'b/' 등 제거
    s = RE_PUNCT.sub(" ", s)              # 나머지 문장부호 -> 공백
    s = RE_WS.sub(" ", s).strip()         # 공백 정규화
    return s

def normalize_for_cer(s: str) -> str:
    return normalize_for_wer(s).replace(" ", "")

def levenshtein(seq_a, seq_b):
    n, m = len(seq_a), len(seq_b)
    if n == 0: return m
    if m == 0: return n
    dp = list(range(m+1))
    for i in range(1, n+1):
        prev, dp[0] = dp[0], i
        for j in range(1, m+1):
            cur = dp[j]
            cost = 0 if seq_a[i-1] == seq_b[j-1] else 1
            dp[j] = min(dp[j] + 1, dp[j-1] + 1, prev + cost)
            prev = cur
    return dp[m]

def cer_score(ref: str, hyp: str):
    r = normalize_for_cer(ref)
    h = normalize_for_cer(hyp)
    if len(r) == 0: return 0.0, 0, 0
    dist = levenshtein(r, h)
    return dist / len(r), dist, len(r)

def wer_score(ref: str, hyp: str):
    r = normalize_for_wer(ref).split()
    h = normalize_for_wer(hyp).split()
    if len(r) == 0: return 0.0, 0, 0
    dist = levenshtein(r, h)
    return dist / len(r), dist, len(r)

def load_trn(trn_path) -> dict:
    """
    TRN 라인 예시 여러 형태를 모두 허용:
      1) '문장 텍스트 ... (KsponSpeech_E00001)'
      2) 'KsponSpeech_E00001 문장 텍스트 ...'
      3) 'KsponSpeech_E00001.wav\t문장 텍스트 ...'
    반환: { 'KsponSpeech_E00001': '문장 텍스트 ...', ... }
    """
    trn_path = Path(trn_path)
    mapping = {}
    with trn_path.open("r", encoding="utf-8") as f:
        for line in f:
            line = line.strip()
            if not line:
                continue

            # 케이스 1) 마지막 괄호의 ID
            m = re.search(r"\(([^)]+)\)\s*$", line)
            if m:
                utt = m.group(1)
                text = line[:m.start()].strip()
            else:
                # 케이스 2/3) <utt>[.wav] <sep> <text>
                parts = re.split(r"[\t ]+", line, maxsplit=1)
                if len(parts) == 2:
                    utt, text = parts[0], parts[1]
                else:
                    # 파싱 실패 시 스킵
                    continue

            utt = os.path.basename(utt)
            utt = os.path.splitext(utt)[0]  # .wav 제거
            mapping[utt] = text
    return mapping


In [13]:
import time, csv
from pathlib import Path
import soundfile as sf

def evaluate_split_vosk(wav_dir, trn_path, model: Model, csv_out=None, log_every=20, preview_miss=10):
    wav_dir  = Path(wav_dir)
    trn_path = Path(trn_path)
    csv_out  = Path(csv_out) if csv_out is not None else None

    refs = load_trn(trn_path)
    wavs = sorted(wav_dir.glob("*.wav"))
    print(f"WAV: {len(wavs)} | TRN entries: {len(refs)}")

    stems = [w.stem for w in wavs]
    missing = [s for s in stems if s not in refs]
    if missing:
        print(f"[경고] TRN에서 찾을 수 없는 wav 키: {len(missing)}/{len(wavs)}개")
        for x in missing[:preview_miss]:
            print("  -", x)

    rows = []
    tot_cdist = tot_cN = 0
    tot_wdist = tot_wN = 0
    tot_secs  = tot_infer = 0.0

    for i, wav in enumerate(wavs, 1):
        utt = wav.stem
        ref = refs.get(utt)
        if ref is None:
            continue

        # 길이(sec)
        audio, sr = sf.read(str(wav), dtype="float32", always_2d=False)
        dur = float(len(audio) / sr)

        # 추론
        t0 = time.perf_counter()
        hyp = vosk_transcribe_file(wav, model, set_words=False, chunk_ms=50)
        t1 = time.perf_counter()

        infer = t1 - t0
        rtf = infer / max(dur, 1e-9)

        cer, cdist, cN = cer_score(ref, hyp)
        wer, wdist, wN = wer_score(ref, hyp)

        rows.append({
            "utt": utt,
            "dur_s": round(dur, 3),
            "infer_s": round(infer, 3),
            "rtf": round(rtf, 3),
            "CER": round(cer, 4),
            "WER": round(wer, 4),
            "ref": ref,
            "hyp": hyp,
        })

        tot_cdist += cdist; tot_cN += cN
        tot_wdist += wdist; tot_wN += wN
        tot_secs  += dur;   tot_infer += infer

        if i % log_every == 0:
            cum_cer = (tot_cdist/tot_cN) if tot_cN else 0.0
            cum_wer = (tot_wdist/tot_wN) if tot_wN else 0.0
            avg_rtf = (tot_infer/tot_secs) if tot_secs else 0.0
            print(f"[{i}/{len(wavs)}] RTF {rtf:.2f} | CER {cer:.3f} | WER {wer:.3f} || cum: RTF {avg_rtf:.2f}, CER {cum_cer:.3f}, WER {cum_wer:.3f}")

    if not rows:
        print("\n[중단] 매칭된 항목이 없습니다.")
        return {"rows": [], "summary": {"files": 0, "unmatched": len(missing)}}

    overall_cer = (tot_cdist / tot_cN) if tot_cN else 0.0
    overall_wer = (tot_wdist / tot_wN) if tot_wN else 0.0
    avg_rtf     = (tot_infer / tot_secs) if tot_secs else 0.0

    print("\n=== Summary (Vosk) ===")
    print(f"Files scored     : {len(rows)} (unmatched: {len(missing)})")
    print(f"Total audio (s)  : {tot_secs:.1f}")
    print(f"Total infer (s)  : {tot_infer:.1f}")
    print(f"Avg RTF          : {avg_rtf:.3f}")
    print(f"CER (char)       : {overall_cer:.4f}")
    print(f"WER (word)       : {overall_wer:.4f}")

    if csv_out:
        csv_out.parent.mkdir(parents=True, exist_ok=True)
        fieldnames = ["utt","dur_s","infer_s","rtf","CER","WER","ref","hyp"]
        with open(csv_out, "w", newline="", encoding="utf-8") as f:
            w = csv.DictWriter(f, fieldnames=fieldnames)
            w.writeheader()
            for r in rows:
                w.writerow(r)
        print(f"Saved: {csv_out}")

    rows_sorted = sorted(rows, key=lambda r: (-r["CER"], -r["WER"], -r["rtf"]))
    print("\nTop-5 by CER:")
    for r in rows_sorted[:5]:
        print(f"- {r['utt']} | CER {r['CER']:.3f} WER {r['WER']:.3f} RTF {r['rtf']:.2f}")
        print(f"  ref: {r['ref']}")
        print(f"  hyp: {r['hyp']}")

    return {
        "rows": rows,
        "summary": dict(files=len(rows), unmatched=len(missing),
                        total_audio_s=tot_secs, total_infer_s=tot_infer,
                        avg_rtf=avg_rtf, cer=overall_cer, wer=overall_wer)
    }


In [14]:
from pathlib import Path

wav_dir  = Path("/Users/leejeje/Desktop/DSL/25-1/Modeling/data/KsponSpeech_eval/eval_clean")
trn_path = Path("/Users/leejeje/Desktop/DSL/25-1/Modeling/data/KsponSpeech_scripts/eval_clean.trn")
out_csv  = Path("results_eval") / "vosk_eval_clean_results.csv"

res_vosk_clean = evaluate_split_vosk(wav_dir, trn_path, vosk_model, csv_out=out_csv)


WAV: 3000 | TRN entries: 3000
[경고] TRN에서 찾을 수 없는 wav 키: 19/3000개
  - KsponSpeech_E00054
  - KsponSpeech_E00135
  - KsponSpeech_E00277
  - KsponSpeech_E00511
  - KsponSpeech_E00581
  - KsponSpeech_E00950
  - KsponSpeech_E01113
  - KsponSpeech_E01343
  - KsponSpeech_E01352
  - KsponSpeech_E01377
[20/3000] RTF 0.87 | CER 0.538 | WER 0.789 || cum: RTF 1.42, CER 0.602, WER 0.962
[40/3000] RTF 1.63 | CER 0.636 | WER 1.000 || cum: RTF 1.80, CER 0.641, WER 0.947


KeyboardInterrupt: 