## 路徑設定及安裝

In [None]:
# !pip install git+https://github.com/openai/whisper.git

In [None]:
import os
from pathlib import Path
from groq import Groq
from pathlib import Path
import whisperx
import torch
import json
from tqdm import tqdm  # ← 這樣匯入的是函數，而非整個模組
import difflib
import re
from pydub import AudioSegment
import noisereduce as nr
import librosa
import soundfile as sf
import numpy as np
import whisper
import re

base_path = Path(r"your_path")
WAV_Dataset = base_path / "WAV"
WAV_AGAIN_Dataset = base_path / "WAV_AGAIN"
submission_task1_answer_ZHEN = base_path / "submission/task1_answer_ZHEN.txt"
submission_task1_answer_TWZH = base_path / "submission/task1_answer_TWZH.txt"
task1_answer_timestamps = base_path / "task1_answer_timestamps.json"
task1_answer_timestamps_ZH = base_path / "task1_answer_timestamps_ZH.json"
device = "cuda" if torch.cuda.is_available() else "cpu"
print(f"device: {device}")

## task1_answer_ZHEN
# 關通知
import os
import sys
import contextlib


@contextlib.contextmanager
def suppress_stdout():
    with open(os.devnull, "w") as devnull:
        old_stdout = sys.stdout
        try:
            sys.stdout = devnull
            yield
        finally:
            sys.stdout = old_stdout

## task1_answer_ZHEN

In [None]:
# 關通知
import os
import sys
import contextlib

@contextlib.contextmanager
def suppress_stdout():
    with open(os.devnull, "w") as devnull:
        old_stdout = sys.stdout
        try:
            sys.stdout = devnull
            yield
        finally:
            sys.stdout = old_stdout


# === 載入 whisperx 模型（自動語言偵測）===
batch_size = 1
model = whisperx.load_model(
    "large-v3",
    device=device,
    language=None,               # 自動語言偵測，保留中英文混合
    vad_options={"vad": False}   # 關閉 VAD
)

# === 開始處理 WAV 檔 ===
with open(submission_task1_answer_ZHEN, "w", encoding="utf-8") as fout:
    wav_files = sorted(WAV_Dataset.glob("*.wav"))

    with tqdm(total=len(wav_files), desc="Transcribing WAV files") as pbar:
        for i, file in enumerate(wav_files):
            file_id = file.stem
            try:
                # 自動語言辨識（中英文混合）
                # 關通知
                with suppress_stdout():
                    result = model.transcribe(str(file), batch_size=batch_size)

                segments = result.get("segments", [])
                full_text = " ".join(seg["text"].strip() for seg in segments).strip()

                # 清理特殊 token
                for token in ["<|startoftranscript|>", "<|en|>", "<|zh|>", "<|transcribe|>", "<|notimestamps|>"]:
                    full_text = full_text.replace(token, "")
                full_text = full_text.replace("Ġ", "").strip()

                # 寫入結果
                fout.write(f"{file_id}\t{full_text}\n")
                fout.flush()

            except Exception as e:
                print(f"[ERROR] 處理 {file_id} 失敗：{e}")
                continue

            # 每 10 筆更新一次進度條，最後補上餘數
            if (i + 1) % 10 == 0 or (i + 1) == len(wav_files):
                step = 10 if (i + 1) % 10 == 0 else (len(wav_files) % 10)
                pbar.update(step)

print(f"\n✅ 完成轉錄（中英文混合）：共處理 {len(wav_files)} 筆，結果已儲存至：{submission_task1_answer_ZHEN}")

## 調整task1_answer.txt順序

In [None]:
from pathlib import Path
import pandas as pd

# 讀取檔案（tab 分隔）
df = pd.read_csv(submission_task1_answer_ZHEN, sep="\t", header=None, names=["fid", "text"])

# 檢查總筆數與重複 fid
print(f"總筆數：{len(df)}")
duplicates = df[df.duplicated(subset="fid", keep=False)]
if not duplicates.empty:
    print("發現重複的 fid：")
    print(duplicates)
else:
    print("無重複 fid")

# 依 fid 排序並重新寫回檔案
df_sorted = df.sort_values(by="fid")
df_sorted.to_csv(submission_task1_answer_ZHEN, sep="\t", index=False, header=False, encoding="utf-8")
print(f"\n已重新依 fid 排序並儲存至：{submission_task1_answer_ZHEN}")

## 全EN時間戳

In [None]:
from pathlib import Path
import whisperx
import torch
import json
from tqdm import tqdm

# === 載入 WhisperX 模型（語音辨識）===
batch_size = 1
model = whisperx.load_model("large-v3", device=device, language="en")

# === 載入英文對齊模型 ===
align_model, metadata = whisperx.load_align_model(language_code="en", device=device)

# === 開始處理 WAV 檔案並寫入詞級時間戳 ===
with open(task1_answer_timestamps, "w", encoding="utf-8") as fout:
    wav_files = sorted(WAV_Dataset.glob("*.wav"))

    for i, file in enumerate(tqdm(wav_files, desc="Aligning English timestamps")):
        file_id = file.stem
        try:
            # 語音辨識
            result = model.transcribe(str(file), batch_size=batch_size, language="en")

            # 對齊詞級時間戳
            result_aligned = whisperx.align(result["segments"], align_model, metadata, str(file), device=device)
            word_segments = result_aligned.get("word_segments", [])

            filtered_words = []
            skipped_words = []

            for w in word_segments:
                if "word" in w and "start" in w and "end" in w:
                    filtered_words.append({
                        "word": w["word"],
                        "start": w["start"],
                        "end": w["end"]
                    })
                else:
                    skipped_words.append(w.get("word", "<UNKNOWN>"))

            if skipped_words:
                print(f"[WARN] {file_id} 有 {len(skipped_words)} 個單字無法對齊時間，已略過：{skipped_words}")

            entry = {
                "filename": file_id,
                "language": "en",
                "words": filtered_words
            }
            fout.write(json.dumps(entry, ensure_ascii=False) + "\n")
            fout.flush()

        except Exception as e:
            print(f"[ERROR] 處理 {file.name} 失敗：{e}")
            continue

print(f"\n✅ 全英文 timestamp 對齊完成，結果儲存於：{task1_answer_timestamps}")


## 純中文時間戳

In [None]:
from pathlib import Path
import whisperx
import torch
import json
from tqdm import tqdm

# === 載入 WhisperX 中文模型與對齊器 ===
batch_size = 1
model_zh = whisperx.load_model("large-v3", device=device, language="zh")
align_model_zh, metadata_zh = whisperx.load_align_model(language_code="zh", device=device)

# === 開始處理 WAV 檔案，僅針對 fid >= 80000 寫入詞級時間戳 ===
with open(task1_answer_timestamps_ZH, "w", encoding="utf-8") as fout:
    wav_files = sorted(WAV_Dataset.glob("*.wav"))

    for file in tqdm(wav_files, desc="Aligning Chinese timestamps (fid ≥ 80000)"):
        file_id = file.stem

        try:
            fid_int = int(file_id)
        except ValueError:
            print(f"[SKIP] 無效的 file_id：{file_id}")
            continue

        if fid_int < 80000:
            continue  # ❌ 不處理英文

        try:
            # 中文語音辨識
            result = model_zh.transcribe(str(file), batch_size=batch_size, language="zh")

            # 對齊詞級時間戳
            result_aligned = whisperx.align(result["segments"], align_model_zh, metadata_zh, str(file), device=device)
            word_segments = result_aligned.get("word_segments", [])

            filtered_words = []
            skipped_words = []

            for w in word_segments:
                if "word" in w and "start" in w and "end" in w:
                    filtered_words.append({
                        "word": w["word"],
                        "start": w["start"],
                        "end": w["end"]
                    })
                else:
                    skipped_words.append(w.get("word", "<UNKNOWN>"))

            if skipped_words:
                print(f"[WARN] {file_id} 有 {len(skipped_words)} 個單字無法對齊時間，已略過：{skipped_words}")

            entry = {
                "filename": file_id,
                "language": "zh",
                "words": filtered_words
            }

            fout.write(json.dumps(entry, ensure_ascii=False) + "\n")
            fout.flush()

        except Exception as e:
            print(f"[ERROR] 處理 {file.name} 失敗：{e}")
            continue

print(f"\n✅ 只針對 fid ≥ 80000 的檔案完成中文 timestamp 對齊，結果儲存於：{task1_answer_timestamps_ZH}")


## 特殊無法辨別的，再辨別一次

In [None]:
# ========== 讀取原始 ZHEN 檔案 ==========
# === 載入 WhisperX 模型（語音辨識）===
batch_size = 1
model = whisperx.load_model("large-v3", device=device, language="zh")

result_dict = {}
invalid_lines = []


with open(submission_task1_answer_ZHEN, "r", encoding="utf-8") as fin:
    for line in fin:
        parts = line.strip().split("\t", maxsplit=1)
        if len(parts) == 2:
            file_id, text = parts
            result_dict[file_id] = text
        else:
            invalid_lines.append(line.strip())

# ========== 重新處理 WAV_AGAIN ==========
wav_again_files = sorted(WAV_AGAIN_Dataset.glob("*.wav"))
updated_file_ids = []
updated_log = {}

print(f"\n🔁 重新處理 {len(wav_again_files)} 筆 WAV_AGAIN 檔案...")

with tqdm(total=len(wav_again_files), desc="Re-transcribing (WAV_AGAIN)") as pbar:
    for file in wav_again_files:
        file_id = file.stem
        try:
            with suppress_stdout():
                result = model.transcribe(str(file), batch_size=batch_size)

            segments = result.get("segments", [])
            full_text = " ".join(seg["text"].strip() for seg in segments).strip()

            for token in ["<|startoftranscript|>", "<|en|>", "<|zh|>", "<|transcribe|>", "<|notimestamps|>"]:
                full_text = full_text.replace(token, "")
            full_text = full_text.replace("Ġ", "").strip()

            old_text = result_dict.get(file_id, "")
            result_dict[file_id] = full_text
            updated_file_ids.append(file_id)
            updated_log[file_id] = {
                "before": old_text,
                "after": full_text
            }

        except Exception as e:
            print(f"[ERROR] WAV_AGAIN 處理 {file_id} 失敗：{e}")
            continue

        pbar.update(1)

# ========== 寫回更新後的 ZHEN 檔案 ==========
with open(submission_task1_answer_ZHEN, "w", encoding="utf-8") as fout:
    for file_id in sorted(result_dict.keys()):
        fout.write(f"{file_id}\t{result_dict[file_id]}\n")

# ========== 印出變更紀錄 ==========
if updated_log:
    print("\n🔁 以下檔案已重新轉錄並更新：")
    for fid in sorted(updated_log.keys()):
        before = updated_log[fid]["before"]
        after = updated_log[fid]["after"]
        print(f" - {fid}")
        print(f"   🟡 Before: {before}")
        print(f"   🟢 After : {after}")
else:
    print("\n⚠️ 沒有任何檔案被更新。")

# ========== 顯示格式錯誤行（若有） ==========
if invalid_lines:
    print("\n以下行無法解析（缺少TAB或欄位不完整），請檢查原始檔案：")
    for line in invalid_lines:
        print(" -", line)


## 清洗文字，將簡體轉繁體+轉換特殊文字

In [None]:
from opencc import OpenCC

# 檔案路徑
input_path = submission_task1_answer_ZHEN
output_path = submission_task1_answer_TWZH

# Step 1: 簡體轉繁體
cc = OpenCC('s2t')

with open(input_path, "r", encoding="utf-8") as fin:
    lines = fin.readlines()

converted_lines = [cc.convert(line) for line in lines]

# Step 2: 常見的簡體轉繁體錯誤
custom_replacements = {
    "牀": "床",
    "喫": "吃",
    "台": "臺",
}

# Step 3: 替換並記錄 log
final_lines = []
log = {}

for line in converted_lines:
    if "\t" not in line:
        final_lines.append(line)
        continue

    fid, sentence = line.strip().split("\t", maxsplit=1)
    original_sentence = sentence
    changes = []

    for wrong, correct in custom_replacements.items():
        if wrong in sentence:
            sentence = sentence.replace(wrong, correct)
            changes.append((wrong, correct))

    final_lines.append(f"{fid}\t{sentence}\n")

    if changes:
        log[fid] = changes

# Step 4: 寫回轉繁體 + 替換後的最終版本
with open(output_path, "w", encoding="utf-8") as fout:
    fout.writelines(final_lines)

# Step 5: 印出替換 log
if log:
    print("修正記錄：")
    for fid in sorted(log.keys()):
        print(f"- {fid}")
        for wrong, correct in log[fid]:
            print(f"  ⮕ 替換：{wrong} ➜ {correct}")

## 把重複字的改掉

In [None]:
import re

# 設定最大重複次數
MAX_REPEATS = 5

# 限制詞語重複次數
def limit_repeated_tokens(text, max_repeats=MAX_REPEATS):
    # 限制單字（例如：啊啊啊啊啊啊 ➜ 啊啊啊啊啊）
    text = re.sub(r'(.)\1{' + str(max_repeats) + r',}', lambda m: m.group(1) * max_repeats, text)

    text = re.sub(
        r'(\S{2,10}?)(\1){' + str(max_repeats) + r',}',
        lambda m: m.group(1) * max_repeats,
        text
    )

    return text

# 讀取 TWZH 檔案
with open(submission_task1_answer_TWZH, "r", encoding="utf-8") as fin:
    lines = fin.readlines()

# 處理每一行，並記錄修改 log
processed_lines = []
mod_log = {}

for line in lines:
    if "\t" not in line:
        processed_lines.append(line)
        continue

    fid, sentence = line.strip().split("\t", maxsplit=1)
    cleaned_sentence = limit_repeated_tokens(sentence)

    if cleaned_sentence != sentence:
        mod_log[fid] = {
            "before": sentence,
            "after": cleaned_sentence
        }

    processed_lines.append(f"{fid}\t{cleaned_sentence}\n")

# 寫回 TWZH
with open(submission_task1_answer_TWZH, "w", encoding="utf-8") as fout:
    fout.writelines(processed_lines)

# 印出修改紀錄
if mod_log:
    print("以下行已進行重複字元/詞語壓縮：")
    for fid in sorted(mod_log.keys()):
        print(f"- {fid}")
        print(f"  🟡 Before: {mod_log[fid]['before']}")
        print(f"  🟢 After : {mod_log[fid]['after']}")
