<a href="https://colab.research.google.com/github/Death-of-math/-/blob/main/HW02/HW02-1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Homework 2-1 Phoneme Classification**

* Slides: https://speech.ee.ntu.edu.tw/~hylee/ml/ml2021-course-data/hw/HW02/HW02.pdf
* Video (Chinese): https://youtu.be/PdjXnQbu2zo
* Video (English): https://youtu.be/ESRr-VCykBs


## The DARPA TIMIT Acoustic-Phonetic Continuous Speech Corpus (TIMIT)
The TIMIT corpus of reading speech has been designed to provide speech data for the acquisition of acoustic-phonetic knowledge and for the development and evaluation of automatic speech recognition systems.

This homework is a multiclass classification task,
we are going to train a deep neural network classifier to predict the phonemes for each frame from the speech corpus TIMIT.

link: https://academictorrents.com/details/34e2b78745138186976cbc27939b1b34d18bd5b3

## Download Data
Download data from google drive, then unzip it.

You should have `timit_11/train_11.npy`, `timit_11/train_label_11.npy`, and `timit_11/test_11.npy` after running this block.<br><br>
`timit_11/`
- `train_11.npy`: training data<br>
- `train_label_11.npy`: training label<br>
- `test_11.npy`:  testing data<br><br>

**notes: if the google drive link is dead, you can download the data directly from Kaggle and upload it to the workspace**




In [1]:
!gdown --id '1HPkcmQmFGu-3OknddKIa5dNDsR05lIQR' --output data.zip
!unzip data.zip
!ls

Downloading...
From (original): https://drive.google.com/uc?id=1HPkcmQmFGu-3OknddKIa5dNDsR05lIQR
From (redirected): https://drive.google.com/uc?id=1HPkcmQmFGu-3OknddKIa5dNDsR05lIQR&confirm=t&uuid=e05f7712-062c-4796-b926-300f6fbed094
To: /content/data.zip
100% 372M/372M [00:02<00:00, 135MB/s]
Archive:  data.zip
   creating: timit_11/
  inflating: timit_11/train_11.npy   
  inflating: timit_11/test_11.npy    
  inflating: timit_11/train_label_11.npy  
data.zip  sample_data  timit_11


## Preparing Data
Load the training and testing data from the `.npy` file (NumPy array).

In [2]:
import numpy as np

print('Loading data ...')

data_root='./timit_11/'
train = np.load(data_root + 'train_11.npy')
train_label = np.load(data_root + 'train_label_11.npy')
test = np.load(data_root + 'test_11.npy')

print('Size of training data: {}'.format(train.shape))
print('Size of testing data: {}'.format(test.shape))

Loading data ...
Size of training data: (1229932, 429)
Size of testing data: (451552, 429)


## Create Dataset

In [3]:
import torch
from torch.utils.data import Dataset

class TIMITDataset(Dataset):
    def __init__(self, X, y=None):
        self.data = torch.from_numpy(X).float()
        if y is not None:
            y = y.astype(int)  # ✅ 修复 np.int 报错
            self.label = torch.LongTensor(y)
        else:
            self.label = None

    def __getitem__(self, idx):
        if self.label is not None:
            return self.data[idx], self.label[idx]
        else:
            return self.data[idx]

    def __len__(self):
        return len(self.data)


Split the labeled data into a training set and a validation set, you can modify the variable `VAL_RATIO` to change the ratio of validation data.

In [4]:
VAL_RATIO = 0.2

percent = int(train.shape[0] * (1 - VAL_RATIO))
train_x, train_y, val_x, val_y = train[:percent], train_label[:percent], train[percent:], train_label[percent:]
print('Size of training set: {}'.format(train_x.shape))
print('Size of validation set: {}'.format(val_x.shape))

Size of training set: (983945, 429)
Size of validation set: (245987, 429)


Create a data loader from the dataset, feel free to tweak the variable `BATCH_SIZE` here.

In [5]:
BATCH_SIZE = 64

from torch.utils.data import DataLoader

train_set = TIMITDataset(train_x, train_y)
val_set = TIMITDataset(val_x, val_y)
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True) #only shuffle the training data
val_loader = DataLoader(val_set, batch_size=BATCH_SIZE, shuffle=False)

Cleanup the unneeded variables to save memory.<br>

**notes: if you need to use these variables later, then you may remove this block or clean up unneeded variables later<br>the data size is quite huge, so be aware of memory usage in colab**

In [None]:
import gc

del train, train_label, train_x, train_y, val_x, val_y
gc.collect()

64

## Create Model

Define model architecture, you are encouraged to change and experiment with the model architecture.

In [11]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Classifier(nn.Module):
    def __init__(self):
        super(Classifier, self).__init__()

        # CNN模块：提取局部时频特征
        self.cnn = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=(3, 3), padding=(1, 1)),  # [B, 1, 11, 39] -> [B, 64, 11, 39]
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(2, 2)),  # [B, 64, 5, 19]
            nn.Dropout(0.3),

            nn.Conv2d(64, 128, kernel_size=(3, 3), padding=(1, 1)),  # [B, 128, 5, 19]
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=(1, 2)),  # 关键修改：[B, 128, 5, 9] 保留时间维度
            nn.Dropout(0.3)
        )

        # LSTM模块：捕捉时序依赖
        self.lstm = nn.LSTM(
            input_size=128*9,  # input_size=128*9 (通道×频率)，时间步数为5
            hidden_size=256,
            num_layers=2,
            batch_first=True,
            bidirectional=True
        )

        # 分类层
        self.classifier = nn.Sequential(
            nn.Linear(512, 256),  # 双向LSTM输出
            nn.BatchNorm1d(256),
            nn.ReLU(),
            nn.Dropout(0.4),
            nn.Linear(256, 39)
        )

    def forward(self, x):
        # 输入重塑：[B, 429] -> [B, 1, 11, 39]
        x = x.view(-1, 1, 11, 39)

        # CNN处理：[B, 1, 11, 39] -> [B, 128, 5, 9]
        x = self.cnn(x)

        # 维度变换：[B, 128, 5, 9] -> [B, 5, 1152]
        # 1. 合并通道和特征维度 [B, 128*9, 5]
        # 2. 转置维度 [B, 5, 1152]
        x = x.view(x.size(0), 128*9, -1).permute(0, 2, 1)

        # LSTM处理：[B, 5, 1152] -> [B, 5, 512]
        x, _ = self.lstm(x)

        # 取最后时间步输出：[B, 512]
        x = x[:, -1, :]

        # 分类输出：[B, 39]
        return self.classifier(x)


## Training

In [12]:
#check device
def get_device():
  return 'cuda' if torch.cuda.is_available() else 'cpu'

Fix random seeds for reproducibility.

In [13]:
# fix random seed
def same_seeds(seed):
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

Feel free to change the training parameters here.

In [14]:
# fix random seed for reproducibility
same_seeds(0)
device = get_device()
print(f'DEVICE: {device}')

# 训练参数优化
num_epoch = 30                # 增加训练轮次，语音任务需要更充分收敛
learning_rate = 0.0005        # 降低学习率，提升稳定性
weight_decay = 1e-5           # L2正则化防止过拟合

# 模型初始化
model = Classifier().to(device)
criterion = nn.CrossEntropyLoss()

# 优化器改进（AdamW + 学习率调度）
optimizer = torch.optim.AdamW(
    model.parameters(),
    lr=learning_rate,
    weight_decay=weight_decay
)

# 学习率调度器（验证损失停滞时自动降低LR）
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
    optimizer,
    mode='min',
    factor=0.5,
    patience=5,
    verbose=True
)

DEVICE: cuda


In [15]:
# 开始训练
best_acc = 0.0
for epoch in range(num_epoch):

    # 训练阶段
    model.train()
    train_acc, train_loss = 0.0, 0.0
    for inputs, labels in train_loader:
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad()
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        loss.backward()

        # 梯度裁剪防止LSTM梯度爆炸
        torch.nn.utils.clip_grad_norm_(model.parameters(), 1.0)

        optimizer.step()

        preds = outputs.argmax(dim=1)
        train_acc += (preds == labels).sum().item()
        train_loss += loss.item()

    # 验证阶段
    model.eval()
    val_acc, val_loss = 0.0, 0.0
    with torch.no_grad():
        for inputs, labels in val_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            outputs = model(inputs)
            loss = criterion(outputs, labels)
            preds = outputs.argmax(dim=1)
            val_acc += (preds == labels).sum().item()
            val_loss += loss.item()

    # 计算指标
    avg_train_acc = train_acc / len(train_set)
    avg_train_loss = train_loss / len(train_loader)
    avg_val_acc = val_acc / len(val_set)
    avg_val_loss = val_loss / len(val_loader)

    # 学习率调度
    scheduler.step(avg_val_loss)

    # 打印日志
    print(f'Epoch [{epoch+1}/30]: '
          f'Train Acc: {avg_train_acc:.4f} Loss: {avg_train_loss:.4f} | '
          f'Val Acc: {avg_val_acc:.4f} Loss: {avg_val_loss:.4f} | '
          f'LR: {optimizer.param_groups[0]["lr"]:.2e}')

    # 保存最佳模型
    if avg_val_acc > best_acc:
        best_acc = avg_val_acc
        torch.save({
            'epoch': epoch,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'val_acc': best_acc,
        }, './model.ckpt')
        print(f'Saved model with val acc {best_acc:.4f}')

Epoch [1/50]: Train Acc: 0.6016 Loss: 1.2901 | Val Acc: 0.6910 Loss: 0.9481 | LR: 5.00e-04
Saved model with val acc 0.6910


KeyboardInterrupt: 

## Testing

Create a testing dataset, and load model from the saved checkpoint.

In [None]:
# create testing dataset
test_set = TIMITDataset(test, None)
test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False)

# create model and load weights from checkpoint
model = Classifier().to(device)
model.load_state_dict(torch.load(model_path))

<All keys matched successfully>

Make prediction.

In [None]:
predict = []
model.eval() # set the model to evaluation mode
with torch.no_grad():
    for i, data in enumerate(test_loader):
        inputs = data
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, test_pred = torch.max(outputs, 1) # get the index of the class with the highest probability

        for y in test_pred.cpu().numpy():
            predict.append(y)

Write prediction to a CSV file.

After finish running this block, download the file `prediction.csv` from the files section on the left-hand side and submit it to Kaggle.

In [None]:
with open('prediction.csv', 'w') as f:
    f.write('Id,Class\n')
    for i, y in enumerate(predict):
        f.write('{},{}\n'.format(i, y))