In [2]:
# -*- coding: utf-8 -*-
# SAFE MODE: CPU + int8 + พิมพ์ทีละประโยค + (ตัวเลือก) PRE-DENOISE WITH FFMPEG

import re, math, shutil, subprocess, sys
from pathlib import Path
from faster_whisper import WhisperModel

# ---------- CONFIG ----------
INPUT_PATH = "ENG_TEST_02.mp4"

# ถ้าแรม/ซีพียูน้อย ใช้ "tiny", "base", "small"; ค่าเดิมคือ "large-v3"
MODEL_NAME = "large-v3"

# ใช้ CPU เพื่อเสถียรภาพ (หลีกเลี่ยง CUDA crash)
DEVICE = "cpu"

# โหมดหน่วยความจำต่ำ
# ตัวเลือกที่ใช้ได้บน CPU: "int8", "int8_float32", "int8_float16", "float32"
COMPUTE_TYPE = "int8"

# เปิด/ปิดการลด noise (ถ้าปิด = ใช้ไฟล์ mp4 เพียวๆ)
ENABLE_DENOISE = False

# วิธีลด noise: "afftdn" (ง่ายและไม่ต้องใช้โมเดล) หรือ "arnndn" (ต้องมี rnnoise model)
DENOISE_METHOD = "afftdn"    # "afftdn" | "arnndn"
AFFTDN_PARAMS = "nr=12"      # ปรับความแรงเช่น nr=8..20 ตามสภาพเสียง
HIGHPASS_HZ = 100            # ตัดความถี่ต่ำ (ฮัม/ลม/กระแทก)
LOWPASS_HZ = 7500            # จำกัดย่านสูงเกินจำเป็น
TARGET_SR = 16000            # รีแซมเปิลเป็น 16kHz โมโน

# ถ้าใช้ arnndn (RNNoise), ใส่ path โมเดลที่มี:
ARNNDN_MODEL = "rnnoise-models/onnx/rnnoise-model.onnx"  # ตัวอย่าง path
# ----------------------------

# ตัดเป็นประโยคทีละประโยค
SENTENCE_END = re.compile(r"([.!?…]|ฯ)\s+")
MAX_CHARS_PER_LINE = 42

def srt_timestamp(t):
    if t < 0: t = 0
    h = int(t // 3600); m = int((t % 3600) // 60); s = int(t % 60)
    ms = int(round((t - int(t)) * 1000))
    return f"{h:02}:{m:02}:{s:02},{ms:03}"

def wrap(text, width=MAX_CHARS_PER_LINE):
    out, line, n = [], [], 0
    for w in text.split():
        add = len(w) + (1 if line else 0)
        if n + add <= width:
            line.append(w); n += add
        else:
            out.append(" ".join(line)); line=[w]; n=len(w)
    if line: out.append(" ".join(line))
    return "\n".join(out) if out else text

# ---------- DENOISE UTILS ----------
def has_ffmpeg():
    return shutil.which("ffmpeg") is not None

def build_filter_chain():
    """
    สร้าง filter สำหรับลด noise + ตัดความถี่
    - highpass เพื่อตัดฮัม/ลม/กระแทกความถี่ต่ำ
    - lowpass เพื่อตัด hiss ย่านสูงเกินจำเป็น
    - afftdn ลด noise แบบความถี่
    หรือ arnndn ถ้ามี RNNoise model
    """
    if DENOISE_METHOD == "arnndn":
        return f"highpass=f={HIGHPASS_HZ},lowpass=f={LOWPASS_HZ},arnndn=m='{ARNNDN_MODEL}'"
    else:
        # afftdn: ปรับ nr (1..30 โดยประมาณ) ได้ตามสภาพเสียง
        return f"highpass=f={HIGHPASS_HZ},lowpass=f={LOWPASS_HZ},afftdn={AFFTDN_PARAMS}"

def ffmpeg_denoise(in_path: str) -> str:
    """
    แปลง/แยกเสียง -> โมโน 16kHz และลด noise แล้วบันทึกเป็น WAV ชั่วคราว
    คืนค่า path ใหม่ที่ผ่านการทำความสะอาดแล้ว
    """
    p = Path(in_path)
    out_wav = p.with_suffix("").as_posix() + "_denoised.wav"

    if not has_ffmpeg():
        print("[WARN] ไม่พบ ffmpeg ในระบบ: จะข้ามขั้นตอนลด noise และใช้ไฟล์เดิม")
        return in_path

    af = build_filter_chain()

    # -vn ตัดวิดีโอ
    # -ac 1 โมโน, -ar 16000 รีแซมเปิล, -sample_fmt s16 ให้เข้ากับ whisper
    cmd = [
        "ffmpeg", "-y",
        "-i", in_path,
        "-vn",
        "-acodec", "pcm_s16le",
        "-ac", "1",
        "-ar", str(TARGET_SR),
        "-af", af,
        out_wav
    ]
    print("[FFmpeg]", " ".join(cmd))
    try:
        subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        return out_wav
    except subprocess.CalledProcessError as e:
        sys.stderr.write(f"[ERROR] FFmpeg denoise ล้มเหลว: {e}\n")
        return in_path

# ---------- MAIN ----------
def main():
    # เตรียมไฟล์เอาต์พุต
    base = Path(INPUT_PATH).with_suffix("").name
    srt_path = f"{base}.srt"
    txt_path = f"{base}_sentences.txt"

    # เปิดไฟล์ผลลัพธ์
    try:
        srt_f = open(srt_path, "w", encoding="utf-8")
        txt_f = open(txt_path, "w", encoding="utf-8")
    except OSError as e:
        sys.stderr.write(f"[ERROR] ไม่สามารถเขียนไฟล์ผลลัพธ์: {e}\n")
        sys.exit(1)

    srt_idx = 0

    def write_sentence(s, st, en):
        nonlocal srt_idx
        srt_idx += 1
        srt_f.write(f"{srt_idx}\n{srt_timestamp(st)} --> {srt_timestamp(en)}\n{wrap(s)}\n\n")
        srt_f.flush()
        txt_f.write(s + "\n"); txt_f.flush()
        print(s)

    print("Loading model (CPU,int8)...")
    model = WhisperModel(MODEL_NAME, device=DEVICE, compute_type=COMPUTE_TYPE)

    # ---------- PREPROCESS (OPTIONAL DENOISE) ----------
    audio_path = INPUT_PATH
    if ENABLE_DENOISE:
        audio_path = ffmpeg_denoise(INPUT_PATH)
    else:
        # ใช้ไฟล์วิดีโอ mp4 ตรงๆ (ต้องมี ffmpeg เพื่อให้ faster-whisper ดึงเสียงได้ดี)
        if not has_ffmpeg():
            print("[WARN] ไม่พบ ffmpeg; การอ่านไฟล์วิดีโอโดยตรงอาจล้มเหลว โปรดติดตั้ง ffmpeg ก่อนถ้าเกิดปัญหา")

    # ---------- TRANSCRIBE (STREAM) ----------
    print(f"Transcribing (stream): {audio_path}")
    segments, info = model.transcribe(
        audio_path,
        vad_filter=True, vad_parameters={"min_silence_duration_ms": 300},
        beam_size=1,
        language=None   # ให้เดาอัตโนมัติ (ถ้ารู้ภาษาล่วงหน้า ใส่โค้ดเช่น 'th','en' จะเร็วขึ้น)
    )

    buf_txt, buf_start, last_end = [], None, None

    def flush(force=False):
        nonlocal buf_txt, buf_start, last_end
        if not buf_txt:
            return

        joined = re.sub(r"\s+", " ", " ".join(buf_txt)).strip()
        work = joined + (" " if not joined.endswith(" ") else "")
        parts = SENTENCE_END.split(work)

        sentences = []
        i = 0
        while i < len(parts):
            chunk = (parts[i] or "").strip()
            if i + 1 < len(parts):
                end_mark = parts[i+1] or ""
                s = (chunk + end_mark).strip()
                if s: sentences.append(s)
                i += 2
            else:
                if force and chunk: sentences.append(chunk)
                i += 1

        if not sentences and force and joined:
            sentences = [joined]

        total = sum(len(s) for s in sentences) or 1
        st = buf_start if buf_start is not None else 0.0
        en = last_end if last_end is not None else st
        cur = st

        for s in sentences:
            r = len(s) / total
            dur = max((en - st) * r, 0.001)
            write_sentence(s, cur, cur + dur)
            cur += dur

        buf_txt, buf_start, last_end = [], None, None

    for seg in segments:
        t = (seg.text or "").strip()
        if not t:
            continue
        if buf_start is None:
            buf_start = float(seg.start or 0.0)
        last_end = float(seg.end or buf_start)
        buf_txt.append(t)

        # ถ้าพบจบประโยค ให้ flush ออกทีละประโยค
        if SENTENCE_END.search(t + " "):
            flush(force=False)

    # เก็บกวาดประโยคค้างบัฟเฟอร์
    flush(force=True)

    srt_f.close()
    txt_f.close()
    print(f"\nSaved SRT: {srt_path}\nSaved TXT: {txt_path}")

if __name__ == "__main__":
    main()


Loading model (CPU,int8)...
Transcribing (stream): ENG_TEST_02.mp4
Excuse me, could you tell me where the nearest post office is?
Yes, it's down the high street across the road from the newsagents.
Is it the building next to the shoe shop?
No, that's the cinema.
The post office is between the bank and the supermarket.
Excuse me, could you tell me where the nearest post office is?
Yes, it's down the high street across the road from the newsagents.
Is it the building next to the shoe shop?
No, that's the cinema.
The post office is between the bank and the supermarket.
Did you get tickets for the concert?
Yes, but I didn't go.
Why?
Did you lose them?
No, my friend was ill and I didn't want to go alone.
You know my boss?
Yeah.
Well, I'm having a bit of a problem with her.
OK, what's the matter?
She just explained...
She just expects me to do too much.
Have you talked to her about it?
Yes, but she just doesn't listen to me.
I see.
Do any of your colleagues feel the same way?
Some of the you