# AI 語音降噪 Colab
這個 Colab 筆記本會使用 [SpeechBrain](https://github.com/speechbrain/speechbrain) 的 MetricGAN+ 模型，或可選用需要較大 GPU 記憶體的 [DNS64](https://github.com/facebookresearch/denoiser) 模型，使用 GPU 進行語音降噪。

載入 Google Drive 中的影音或聲音檔案，並將結果產生為聲音檔 (.wav)。

- 新增開發者模式可顯示詳細系統訊息 / A new developer mode shows detailed system messages.
- 每次檔案處理後會釋放 GPU 記憶體，避免 VRAM 累積 / GPU memory is released after each file to avoid VRAM buildup.

In [None]:
#@title 🎉 一鍵啟動語音降噪 / One-click Audio Denoiser
print("📂 掛載雲端硬碟中 / Mounting Google Drive...")
from google.colab import drive
drive.mount('/content/drive')

# 👉 請依需要改這裡（可放資料夾或單一檔案路徑）
#@markdown #### 輸入路徑 / Input path
#@markdown 指定要處理的檔案或資料夾路徑。 / Specify the file or folder path to process.
input_path = "/content/drive/MyDrive/TO/YOUR/FILE"  #@param {type:"string"}

#@markdown #### 使用 GPU / Use GPU
#@markdown 使用 GPU 可加速，但需要支援的裝置。 / GPU acceleration requires compatible hardware.
use_gpu = True  #@param {type:"boolean"}

#@markdown #### 降噪模型 / Denoising model
#@markdown MetricGAN+ 省記憶體；DNS64 效果較佳但耗記憶體。 / MetricGAN+ uses less memory; DNS64 offers better quality but needs more memory.
model_choice = "DNS64"  #@param ["MetricGAN+","DNS64"] {type:"string"}

#@markdown #### 開發者模式 / Developer mode
#@markdown 顯示更多系統訊息，除錯用。一般使用者可保持關閉。 / Show detailed system messages for debugging; normally keep off.
developer_mode_option = "關閉 / Off"  #@param ["關閉 / Off","開啟 / On"] {type:"string"}
developer_mode = developer_mode_option == "開啟 / On"
print(f"🛠️ 開發者模式 / Developer mode: {developer_mode}")

#@markdown #### 已存在輸出檔案 / Existing output files
#@markdown 遇到已產生的 `.denoised.wav` 時選擇覆蓋或跳過。 / Choose to overwrite or skip existing `.denoised.wav` files.
overwrite_option = "跳過 / Skip"  #@param ["跳過 / Skip","覆蓋 / Overwrite"] {type:"string"}
overwrite_existing = overwrite_option == "覆蓋 / Overwrite"

print("📦 正在安裝套件 / Installing packages...")
# 安裝系統 ffmpeg（重要）
!apt-get -y update -qq
!apt-get -y install -qq ffmpeg
# Python 套件
!pip install -q speechbrain torchaudio ffmpeg-python soundfile
if model_choice == "DNS64":
    !pip install -q denoiser
print("✅ 套件安裝完成 / Packages installed!")

import os, glob, subprocess, torch, torchaudio, tempfile
def log(msg):
    if developer_mode:
        print(msg)
from speechbrain.pretrained import SpectralMaskEnhancement
import torchaudio.transforms as T
if model_choice == "DNS64":
    from denoiser import pretrained

# 讓推論階段關閉梯度
torch.set_grad_enabled(False)

video_exts = ['.mp4','.mov','.avi','.mkv','.flv','.webm']
audio_exts = ['.wav','.mp3','.flac','.ogg','.m4a']

def extract_audio(src):
    """用 ffmpeg 從影片抽 16k/mono/PCM WAV，回傳暫存檔路徑。"""
    fd, tmp_path = tempfile.mkstemp(prefix="tmp_input_", suffix=".wav", dir="/content")
    os.close(fd)
    cmd = [
        'ffmpeg','-y','-hide_banner','-loglevel','error',
        '-i', src,
        '-vn',                 # 不要影片
        '-ac','1',             # 單聲道
        '-ar','16000',         # 16 kHz
        '-acodec','pcm_s16le', # PCM 16-bit
        tmp_path
    ]
    subprocess.run(cmd, check=True)
    if not os.path.exists(tmp_path) or os.path.getsize(tmp_path) == 0:
        raise RuntimeError(f"ffmpeg 輸出檔案異常：{tmp_path}")
    return tmp_path

def denoise_file(src, enhance_fn):
    """處理單一檔案：抽音（若是影片）→ 載入 → 轉單聲道/Resample → 降噪 → 存檔"""
    print(f"▶️ 開始處理 / Processing: {src}")
    ext = os.path.splitext(src)[1].lower()
    processed_input = src
    tmp_to_cleanup = None

    if ext in video_exts:
        try:
            processed_input = extract_audio(src)
            tmp_to_cleanup = processed_input
        except subprocess.CalledProcessError as e:
            print(f"❌ 影片抽音失敗（ffmpeg）{src}: {e}")
            return
        except Exception as e:
            print(f"❌ 影片抽音發生錯誤 {src}: {e}")
            return

    try:
        log(f"載入音訊 / Loading audio: {processed_input}")
        audio, sr = torchaudio.load(processed_input)  # [channels, time]
        log(f"原始取樣率 / Original sample rate: {sr}")

        # 轉單聲道（保險）
        if audio.dim() == 2 and audio.shape[0] > 1:
            audio = torch.mean(audio, dim=0, keepdim=True)  # [1, T]

        # Resample -> 16 kHz（模型預期）
        if sr != 16000:
            resampler = T.Resample(orig_freq=sr, new_freq=16000)
            audio = resampler(audio)
            sr = 16000

        # 轉成 [batch, time] 形狀給 SpeechBrain（目前是 [1, T]，視為 batch=1）
        audio = audio.to(torch.float32)
        if audio.dim() == 2 and audio.shape[0] == 1:
            audio = audio.squeeze(0).unsqueeze(0)  # [1, T] 明確當作 batch

        # 放到正確的 device
        audio = audio.to(device)
        if device == "cuda":
            log(f"GPU 記憶體使用 / GPU memory: {torch.cuda.memory_allocated()/1024**2:.2f} MB")

        # ---- 修正重點：enhance_batch 需要 lengths 參數（相對長度），給 1.0 代表整段有效 ----
        enhanced = enhance_fn(audio)  # 回傳 [1, T]

        # 存檔（torchaudio.save 需要 [channels, time]）
        out_file = os.path.splitext(src)[0] + '_denoised.wav'
        if os.path.exists(out_file):
            if overwrite_existing:
                print(f"♻️ 覆蓋已存在檔案 / Overwriting existing file: {out_file}")
            else:
                print(f"⏭️ 已存在檔案，跳過 / File exists, skipping: {out_file}")
                return
        to_save = enhanced.detach().cpu()
        if to_save.dim() == 2 and to_save.shape[0] == 1:
            pass  # 已是 [1, T]
        elif to_save.dim() == 2 and to_save.shape[0] != 1:
            # 如果不小心成了 [B, T] 且 B>1，取第一條（理論上不會）
            to_save = to_save[:1]
        else:
            # 若是 [T]，補上 channel 維度
            to_save = to_save.unsqueeze(0)

        # 夾在 [-1, 1]，避免寫檔溢位
        to_save = torch.clamp(to_save, -1.0, 1.0)
        torchaudio.save(out_file, to_save, sr)
        print(f"🎵 已處理 / Done: {out_file}")

    except RuntimeError as e:
        print(f"❌ 無法載入或處理音訊檔案 {processed_input}: {e}")
    except Exception as e:
        print(f"❌ 處理檔案時發生錯誤 {src}: {e}")
    finally:
        if tmp_to_cleanup and os.path.exists(tmp_to_cleanup):
            try:
                os.remove(tmp_to_cleanup)
            except Exception:
                pass
        if 'audio' in locals():
            del audio
        if 'enhanced' in locals():
            del enhanced
        if device == 'cuda':
            torch.cuda.empty_cache()
            log('🧹 已清除 GPU 記憶體 / GPU memory cleared')

# 選擇裝置
device = 'cuda' if (use_gpu and torch.cuda.is_available()) else 'cpu'
print(f"💻 使用裝置 / Device: {device}")

if model_choice == 'MetricGAN+':
    enhancer = SpectralMaskEnhancement.from_hparams(
        source='speechbrain/metricgan-plus-voicebank',
        savedir='pretrained_models/metricgan-plus-voicebank',
        run_opts={'device': device}
    )
    def enhance_fn(wav):
        lengths = torch.tensor([1.0], device=wav.device)
        return enhancer.enhance_batch(wav, lengths=lengths)
else:
    dns_model = pretrained.dns64().to(device)
    dns_model.eval()
    def enhance_fn(wav):
        return dns_model(wav)[0]

# 批次或單檔處理
if os.path.isdir(input_path):
    print('📁 偵測到資料夾，開始批次處理 / Folder detected, processing all files...')
    files = [f for f in glob.glob(os.path.join(input_path, '*'))
             if os.path.splitext(f)[1].lower() in video_exts + audio_exts
             and not f.endswith('_denoised.wav')]
    for f in sorted(files):
        denoise_file(f, enhance_fn)
else:
    if os.path.exists(input_path):
        if input_path.endswith('_denoised.wav'):
            print(f"⏭️ 已是降噪後的檔案，跳過 / Already denoised, skipping: {input_path}")
        else:
            denoise_file(input_path, enhance_fn)
    else:
        print(f"⚠️ 找不到路徑：{input_path}")

print('🎉 全部搞定 / All done!')

