In [None]:
pip install librosa pandas torch torchaudio transformers tqdm numpy

In [1]:
import os
import subprocess

def convert_audio(input_dir, output_dir):
    # 確保輸出目錄存在
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    # 遍歷目錄中的所有 .wav 文件
    for root, _, files in os.walk(input_dir):
        for file in files:
            if file.endswith(".wav"):
                input_path = os.path.join(root, file)
                
                # 生成輸出文件的路徑
                relative_path = os.path.relpath(root, input_dir)  # 保留相對路徑
                output_folder = os.path.join(output_dir, relative_path)
                if not os.path.exists(output_folder):
                    os.makedirs(output_folder)
                
                output_path = os.path.join(output_folder, file)
                
                # 調用 Sox 進行格式轉換
                command = [
                    "sox", input_path,
                    "-r", "16000",         # 16 kHz
                    "-e", "signed-integer",# signed-integer
                    "-b", "16",            # 16 bits
                    output_path
                ]
                
                print(f"Converting: {input_path} -> {output_path}")
                subprocess.run(command, check=True)

# 設置資料夾
train_input_dir = "train"
train_output_dir = "train_converted"
test_input_dir = "test"
test_output_dir = "test_converted"

# 轉換 train 和 test 資料夾中的音頻
convert_audio(train_input_dir, train_output_dir)
convert_audio(test_input_dir, test_output_dir)

Converting: train/2392.wav -> train_converted/./2392.wav
Converting: train/1314.wav -> train_converted/./1314.wav
Converting: train/1026.wav -> train_converted/./1026.wav
Converting: train/1142.wav -> train_converted/./1142.wav
Converting: train/1802.wav -> train_converted/./1802.wav
Converting: train/148.wav -> train_converted/./148.wav
Converting: train/1098.wav -> train_converted/./1098.wav
Converting: train/2077.wav -> train_converted/./2077.wav
Converting: train/1367.wav -> train_converted/./1367.wav
Converting: train/3087.wav -> train_converted/./3087.wav
Converting: train/1385.wav -> train_converted/./1385.wav
Converting: train/1018.wav -> train_converted/./1018.wav
Converting: train/1784.wav -> train_converted/./1784.wav
Converting: train/2966.wav -> train_converted/./2966.wav
Converting: train/340.wav -> train_converted/./340.wav
Converting: train/2414.wav -> train_converted/./2414.wav
Converting: train/1256.wav -> train_converted/./1256.wav
Converting: train/1188.wav -> train

讀取字典檔，建立 台羅拼音->音素 序列的對應表

In [2]:
import os
import csv
import librosa
import numpy as np
from sklearn.model_selection import train_test_split

def load_lexicon(lexicon_path):
    # lexicon.txt格式假設為：
    # word phone1 phone2 ...
    # 如：ba b a
    # a iNULL a
    # 將每一行的第一個欄位當成拼音字串，其餘欄位是對應音素。
    lex_map = {}
    with open(lexicon_path, 'r', encoding='utf-8') as f:
        for line in f:
            line = line.strip()
            if not line:
                continue
            parts = line.split()
            # 第一個為詞彙(如"ba")，後面為其對應音素序列
            word = parts[0]
            phones = parts[1:]
            lex_map[word] = phones
    return lex_map

將台羅拼音句子轉為音素序列

In [3]:
def text_to_phoneme_sequence(text, lex_map):
    # text 如："li be e mih kiann lan lan san san long be tsiau tsng"
    # 一般是空白分詞，若需要根據您的實際資料格式調整
    words = text.strip().split()
    
    phoneme_seq = []
    for w in words:
        # 檢查字典中是否有此詞
        # 實務上，有些拼音可能要再細分或檢查
        if w in lex_map:
            phoneme_seq.extend(lex_map[w])
        else:
            # 若無對應，則可考慮略過或保留原文字串
            # 這裡先簡單略過或將 w 作為一整個 token
            # 實務上建議先確保 lexicon 完備
            phoneme_seq.append(w)
    return phoneme_seq

對 log_mel_spec 進行時間與頻率維度遮罩

In [4]:
def spec_augment(log_mel_spec, time_mask_num=1, freq_mask_num=1, time_mask_size=20, freq_mask_size=10):
    # log_mel_spec: shape (n_mels, T)
    n_mels, T = log_mel_spec.shape

    # 頻率遮罩
    for _ in range(freq_mask_num):
        f = np.random.randint(0, freq_mask_size)
        f_start = np.random.randint(0, n_mels - f)
        log_mel_spec[f_start:f_start+f, :] = 0

    # 時間遮罩
    for _ in range(time_mask_num):
        t = np.random.randint(0, time_mask_size)
        t_start = np.random.randint(0, T - t)
        log_mel_spec[:, t_start:t_start+t] = 0

    return log_mel_spec

載入音檔並萃取特徵(Mel-spectrogram)

In [6]:
def load_audio_features(wav_path, sr=16000, n_mels=80, frame_length=0.025, frame_shift=0.01):
    # 載入 wav 音檔
    y, sr = librosa.load(wav_path, sr=sr)
    # 計算 n_fft, hop_length, win_length
    n_fft = int(sr * frame_length)
    hop_length = int(sr * frame_shift)
    win_length = n_fft

    # 計算 Mel-filterbank 特徵
    mel_spec = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=n_fft, 
                                              hop_length=hop_length, 
                                              win_length=win_length, 
                                              n_mels=n_mels, fmin=20, fmax=sr/2)
    # 將能量取log(避免log(0)可加上小量)
    log_mel_spec = np.log(np.maximum(mel_spec, 1e-10))
    return log_mel_spec

從 train-toneless.csv 讀取資料，處理成最終 (特徵, 標籤序列)清單

In [7]:
def preprocess_data(entries, wav_dir, lexicon_map):
    features_list = []
    labels_list = []
    for row in entries:
        utt_id = row['id']
        text = row['text']
        wav_path = os.path.join(wav_dir, utt_id + '.wav')
        if not os.path.exists(wav_path):
            print(f"Warning: {wav_path} not found.")
            continue
        feats = load_audio_features(wav_path)
        phoneme_seq = text_to_phoneme_sequence(text, lexicon_map)
        features_list.append(feats)
        labels_list.append(phoneme_seq)
    return features_list, labels_list

資料分割

In [8]:
if __name__ == "__main__":
    lexicon_path = 'lexicon.txt'
    train_csv_path = 'train-toneless.csv'
    wav_dir = 'train_converted'
    
    # 讀取 lexicon
    lex_map = load_lexicon(lexicon_path)
    
    # 讀取整個訓練 csv
    entries = []
    with open(train_csv_path, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            entries.append(row)
    
    # 分割比例: 90% 用於訓練, 10% 用於驗證
    train_entries, valid_entries = train_test_split(entries, test_size=0.1, random_state=42)
    
    # 前處理訓練集資料
    train_features, train_labels = preprocess_data(train_entries, wav_dir, lex_map)
    # 前處理驗證集資料
    valid_features, valid_labels = preprocess_data(valid_entries, wav_dir, lex_map)
    
    # 建立 phoneme->id 映射表 (根據訓練集和驗證集所有出現過的 phoneme)
    all_phones = set(p for seq in train_labels+valid_labels for p in seq)
    phone2id = {p: i for i, p in enumerate(sorted(all_phones))}
    
    # 將訓練與驗證標籤轉為 ID 序列
    train_labels_id = [[phone2id[p] for p in seq] for seq in train_labels]
    valid_labels_id = [[phone2id[p] for p in seq] for seq in valid_labels]
    
    # 至此您擁有：
    # train_features, train_labels_id
    # valid_features, valid_labels_id
    # 可以進一步用於模型訓練及驗證。

訓練模型()

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
import numpy as np
from tqdm import tqdm

# 假設您已有以下資料 (由前處理階段取得)
# train_features: List of np.array, each shape like (n_mels, T_frames)
# train_labels_id: List of List[int]
# valid_features: same structure as train_features
# valid_labels_id: same structure as train_labels_id
# phone2id: dict {phone_str: id_int}, 且有包含 <blank>:0

class SpeechDataset(Dataset):
    def __init__(self, features_list, labels_list):
        self.features_list = features_list
        self.labels_list = labels_list

    def __len__(self):
        return len(self.features_list)

    def __getitem__(self, idx):
        feat = self.features_list[idx].T  # (T, n_mels)
        label = self.labels_list[idx]
        feat_tensor = torch.tensor(feat, dtype=torch.float32)
        label_tensor = torch.tensor(label, dtype=torch.int64)
        return feat_tensor, label_tensor

def collate_fn(batch):
    feat_lengths = [x[0].shape[0] for x in batch]
    label_lengths = [x[1].shape[0] for x in batch]
    max_feat_len = max(feat_lengths)
    max_label_len = max(label_lengths)

    n_mels = batch[0][0].shape[1]
    feats_padded = torch.zeros(len(batch), max_feat_len, n_mels)
    labels_padded = torch.zeros(len(batch), max_label_len, dtype=torch.int64)

    for i, (f, l) in enumerate(batch):
        feats_padded[i, :f.shape[0], :] = f
        labels_padded[i, :l.shape[0]] = l

    return feats_padded, labels_padded, torch.tensor(feat_lengths, dtype=torch.int64), torch.tensor(label_lengths, dtype=torch.int64)


train_dataset = SpeechDataset(train_features, train_labels_id)
valid_dataset = SpeechDataset(valid_features, valid_labels_id)

train_loader = DataLoader(train_dataset, batch_size=8, shuffle=True, collate_fn=collate_fn)
valid_loader = DataLoader(valid_dataset, batch_size=8, shuffle=False, collate_fn=collate_fn)


class RNNCTCModel(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=3):
        super(RNNCTCModel, self).__init__()
        self.rnn = nn.LSTM(input_dim, hidden_dim, num_layers=num_layers, bidirectional=True, batch_first=True)
        self.fc = nn.Linear(hidden_dim*2, output_dim)

    def forward(self, x):
        outputs, _ = self.rnn(x) # (batch, T, hidden_dim*2)
        logits = self.fc(outputs) # (batch, T, output_dim)
        logits = logits.permute(1, 0, 2) # (T, batch, output_dim)
        return logits

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

n_mels = train_features[0].shape[0]
output_dim = len(phone2id)
hidden_dim = 256
num_layers = 3

model = RNNCTCModel(input_dim=n_mels, hidden_dim=hidden_dim, output_dim=output_dim, num_layers=num_layers).to(device)

criterion = nn.CTCLoss(blank=0)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

def train_one_epoch(model, dataloader, criterion, optimizer, device):
    model.train()
    total_loss = 0.0
    total_samples = 0
    pbar = tqdm(dataloader, desc="Training", leave=False)
    for feats, labels, feat_lens, label_lens in pbar:
        feats = feats.to(device)
        labels = labels.to(device)

        optimizer.zero_grad()
        logits = model(feats) # (T, batch, output_dim)
        log_probs = torch.nn.functional.log_softmax(logits, dim=-1)
        
        loss = criterion(log_probs, labels, feat_lens, label_lens)
        loss.backward()
        optimizer.step()

        batch_size = feats.size(0)
        total_loss += loss.item() * batch_size
        total_samples += batch_size
        pbar.set_postfix(loss=f"{loss.item():.4f}")

    return total_loss / total_samples

def validate(model, dataloader, criterion, device):
    model.eval()
    total_loss = 0.0
    total_samples = 0
    pbar = tqdm(dataloader, desc="Validating", leave=False)
    with torch.no_grad():
        for feats, labels, feat_lens, label_lens in pbar:
            feats = feats.to(device)
            labels = labels.to(device)

            logits = model(feats)
            log_probs = torch.nn.functional.log_softmax(logits, dim=-1)
            loss = criterion(log_probs, labels, feat_lens, label_lens)

            batch_size = feats.size(0)
            total_loss += loss.item() * batch_size
            total_samples += batch_size
            pbar.set_postfix(val_loss=f"{loss.item():.4f}")

    return total_loss / total_samples

# Early Stopping 機制
class EarlyStopping:
    def __init__(self, patience=3, delta=0.0, save_path="best_model.pth"):
        self.patience = patience
        self.delta = delta
        self.save_path = save_path
        self.counter = 0
        self.best_loss = None
        self.early_stop = False

    def __call__(self, val_loss, model):
        if self.best_loss is None:
            self.best_loss = val_loss
            self.save_checkpoint(val_loss, model)
        elif val_loss > self.best_loss - self.delta:
            self.counter += 1
            if self.counter >= self.patience:
                self.early_stop = True
        else:
            self.best_loss = val_loss
            self.save_checkpoint(val_loss, model)
            self.counter = 0

    def save_checkpoint(self, val_loss, model):
        torch.save(model.state_dict(), self.save_path)

num_epochs = 10
early_stopping = EarlyStopping(patience=50, delta=0.0, save_path="best_model.pth")

for epoch in range(num_epochs):
    print(f"Epoch {epoch+1}/{num_epochs}")
    train_loss = train_one_epoch(model, train_loader, criterion, optimizer, device)
    valid_loss = validate(model, valid_loader, criterion, device)
    print(f"Train Loss: {train_loss:.4f}, Valid Loss: {valid_loss:.4f}")

    # Early Stopping 判斷
    early_stopping(valid_loss, model)
    if early_stopping.early_stop:
        print("Early stopping!")
        break

# 訓練完成後，已經透過 EarlyStopping 儲存最佳模型於 "best_model.pth"
# 若想儲存最後的模型狀態，也可另外 torch.save(model.state_dict(), "final_model.pth")


預測測驗集

In [None]:
import os
import csv
import torch
import torch.nn as nn
import numpy as np
import librosa

# 假設已經有以下變數:
# model: 訓練完成後的 RNNCTCModel，且已載入權重
# device: torch.device
# phone2id: dict {phone_str: id_int}, 並包含 <blank>:0
id2phone = {v: k for k, v in phone2id.items()}

def load_audio_features(wav_path, sr=16000, n_mels=80, frame_length=0.025, frame_shift=0.01):
    y, sr = librosa.load(wav_path, sr=sr)
    n_fft = int(sr * frame_length)
    hop_length = int(sr * frame_shift)
    win_length = n_fft
    mel_spec = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=n_fft,
                                              hop_length=hop_length,
                                              win_length=win_length,
                                              n_mels=n_mels, fmin=20, fmax=sr/2)
    log_mel_spec = np.log(np.maximum(mel_spec, 1e-10))
    return log_mel_spec

def ctc_greedy_decode(logits, blank_id=0):
    # logits: (T, batch, output_dim)
    # greedy decode
    pred_ids = torch.argmax(logits, dim=-1)  # (T, batch)
    pred_ids = pred_ids.transpose(0,1)       # (batch, T)
    results = []
    for seq in pred_ids:
        prev = blank_id
        out = []
        for p in seq:
            p = p.item()
            if p != blank_id and p != prev:
                out.append(p)
            prev = p
        results.append(out)
    return results

def inference(model, test_wav_dir, sample_csv_path, output_csv_path):
    # sample.csv: 欄位 (id, text)
    # 我們會根據 sample.csv 的 id 讀取 test_converted 目錄下的同名音檔
    # 並將推論結果寫回 output_csv_path
    test_entries = []
    with open(sample_csv_path, 'r', encoding='utf-8') as f:
        reader = csv.DictReader(f)
        for row in reader:
            test_entries.append(row)

    model.eval()
    n_mels = train_features[0].shape[0]  # 假設與訓練時相同的 n_mels

    predictions = []

    with torch.no_grad():
        for entry in test_entries:
            utt_id = entry["id"]
            
            wav_path = os.path.join(test_wav_dir, utt_id + ".wav")
            if not os.path.exists(wav_path):
                print(f"Warning: {wav_path} not found.")
                # 若音檔不存在，可用空結果或略過
                predictions.append({"id": utt_id, "text": ""})
                continue

            feats = load_audio_features(wav_path)
            feats_t = torch.tensor(feats.T, dtype=torch.float32).unsqueeze(0).to(device) # (1, T, n_mels)

            logits = model(feats_t)  # (T, batch, output_dim)
            log_probs = torch.nn.functional.log_softmax(logits, dim=-1)
            pred_ids = ctc_greedy_decode(log_probs, blank_id=0)[0]

            pred_phones = [id2phone[i] for i in pred_ids]
            pred_text = " ".join(pred_phones)

            predictions.append({"id": utt_id, "text": pred_text})

    with open(output_csv_path, 'w', encoding='utf-8', newline='') as f:
        writer = csv.DictWriter(f, fieldnames=["id", "text"])
        writer.writeheader()
        for pred in predictions:
            writer.writerow(pred)

# 假設此時您已載入模型最佳參數:
model.load_state_dict(torch.load("best_model.pth", map_location=device))
model.to(device)

# 執行推論(不計算Levenshtein Distance)
inference(model, "test_converted", "sample.csv", "sample_pred.csv")
