In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/ml2021spring-hw2/sampleSubmission.csv
/kaggle/input/ml2021spring-hw2/timit_11/timit_11/train_11.npy
/kaggle/input/ml2021spring-hw2/timit_11/timit_11/test_11.npy
/kaggle/input/ml2021spring-hw2/timit_11/timit_11/train_label_11.npy


In [None]:
import numpy as np

print('Loading data ...')

data_root='/kaggle/input/ml2021spring-hw2/timit_11/timit_11/'
train = np.load(data_root + 'train_11.npy')
train_label = np.load(data_root + 'train_label_11.npy')
test = np.load(data_root + 'test_11.npy')

print('Size of training data: {}'.format(train.shape))
print('Size of testing data: {}'.format(test.shape))

In [None]:
import torch
from torch.utils.data import Dataset

class TIMITDataset(Dataset):
    #初始化数据集对象
    def __init__(self, X, y=None):
        self.data = torch.from_numpy(X).float()
        #若 y 存在，将其从 NumPy 数组转换为 LongTensor（要求标签为整数类型）。
        #若 y 不存在（如测试阶段），self.label 设为 None。
        if y is not None:
            y = y.astype(np.int64) 
            self.label = torch.LongTensor(y)
        else:
            self.label = None

    #通过索引 idx 获取单个样本（和标签）
    def __getitem__(self, idx):
        if self.label is not None:
            return self.data[idx], self.label[idx]
        else:
            return self.data[idx]

    def __len__(self):
        return len(self.data)


In [None]:
#定义验证集的比例
VAL_RATIO = 0.2
#计算划分点
percent = int(train.shape[0] * (1 - VAL_RATIO))
#划分特征和标签
train_x, train_y, val_x, val_y = train[:percent], train_label[:percent], train[percent:], train_label[percent:]
print('Size of training set: {}'.format(train_x.shape))
print('Size of validation set: {}'.format(val_x.shape))

In [None]:
#batch大小
BATCH_SIZE = 512

from torch.utils.data import DataLoader

#创建数据集实例
train_set = TIMITDataset(train_x, train_y)
val_set = TIMITDataset(val_x, val_y)
#创建数据加载器
#DataLoader 参数解释：
#dataset：传入的数据集（如 train_set）。
#batch_size=BATCH_SIZE：每个批次加载的样本数（此处为 64）。
#shuffle=True/False：
#训练集（train_loader）：shuffle=True 表示每个 epoch 都会随机打乱数据顺序，防止模型学习到样本的特定顺序，提高泛化能力。
#验证集（val_loader）：shuffle=False 表示按固定顺序加载数据，方便结果复现和评估模型性能。
train_loader = DataLoader(train_set, batch_size=BATCH_SIZE, shuffle=True) #only shuffle the training data
val_loader = DataLoader(val_set, batch_size=BATCH_SIZE, shuffle=False)

In [None]:
#释放不再使用的内存
import gc

del train, train_label, train_x, train_y, val_x, val_y
gc.collect()

In [None]:
#用于语音识别的多层感知机（MLP）分类器
import torch
import torch.nn as nn

# class Classifier(nn.Module):
#     def __init__(self):
#         super(Classifier, self).__init__()
#         self.layer1 = nn.Linear(429, 1024)
#         self.layer2 = nn.Linear(1024, 512)
#         self.layer3 = nn.Linear(512, 128)
#         self.out = nn.Linear(128, 39) 

#         self.act_fn = nn.ReLU()

#     #向前传播函数
#     def forward(self, x):
#         x = self.layer1(x)#线性变换
#         x = self.act_fn(x)#sigmoid激活

#         x = self.layer2(x)
#         x = self.act_fn(x)

#         x = self.layer3(x)
#         x = self.act_fn(x)

#         x = self.out(x)
        
#         return x
#将神经网络改为下面
class Classifier(nn.Module):
    def __init__(self):
        super(Classifier, self).__init__()
        self.net = nn.Sequential(
            nn.Linear(429, 2048), # 1
            nn.LeakyReLU(),
            #nn.ReLU(),
            nn.BatchNorm1d(2048),
            nn.Dropout(0.5),
            nn.Linear(2048, 2048), # 2
            nn.LeakyReLU(),
            #nn.ReLU(),
            nn.BatchNorm1d(2048),
            nn.Dropout(0.5),
            nn.Linear(2048, 2048), # 2
            nn.LeakyReLU(),
            #nn.ReLU(),
            nn.BatchNorm1d(2048),
            nn.Dropout(0.5),
            nn.Linear(2048,1024), # 3
            nn.LeakyReLU(),
            #nn.ReLU(),
            nn.BatchNorm1d(1024),
            nn.Dropout(0.5),
            nn.Linear(1024, 512), # 4
            #nn.ReLU(),
            nn.LeakyReLU(),
            nn.BatchNorm1d(512),
            nn.Dropout(0.5),
            nn.Linear(512, 256), # 5
            #nn.ReLU(),
            nn.LeakyReLU(),
            nn.BatchNorm1d(256),
            nn.Dropout(0.5),
            nn.Linear(256, 39)
        )

    def forward(self, x):
        x = self.net(x)
        return x

In [None]:
#check device
def get_device():
  return 'cuda' if torch.cuda.is_available() else 'cpu'

In [None]:
# 固定随机种子的函数
def same_seeds(seed):
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)  
    np.random.seed(seed)  
    torch.backends.cudnn.benchmark = False
    torch.backends.cudnn.deterministic = True

In [None]:
# fix random seed for reproducibility
#随机种子设为0
same_seeds(0)

# get device 
device = get_device()
print(f'DEVICE: {device}')

# training parameters
num_epoch = 100              # 训练轮数
learning_rate = 0.0001       # learning rate

# the path where checkpoint saved
model_path = './model.ckpt'

# create model, define a loss function, and optimizer
model = Classifier().to(device)
criterion = nn.CrossEntropyLoss() #交叉熵
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)#优化器Adam

In [None]:
# start training

best_acc = 0.0#记录最佳的准确率
for epoch in range(num_epoch):#训练循环
    train_acc = 0.0  # 训练集总准确率
    train_loss = 0.0  # 训练集总损失
    val_acc = 0.0    # 验证集总准确率
    val_loss = 0.0    # 验证集总损失

    # training
    model.train() # set the model to training mode
    for i, data in enumerate(train_loader):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        optimizer.zero_grad() #清空梯度缓存
        outputs = model(inputs) #向前传播，计算预测值
        batch_loss = criterion(outputs, labels)#计算损失
        _, train_pred = torch.max(outputs, 1) # # 获取预测类别（取概率最高的索引）
        batch_loss.backward() #反向传播：计算梯度
        optimizer.step() #更新模型参数

        train_acc += (train_pred.cpu() == labels.cpu()).sum().item()#统计正确预测数
        train_loss += batch_loss.item()#累加批次损失

    # validation
    if len(val_set) > 0:
        model.eval() # set the model to evaluation mode
        with torch.no_grad():
            for i, data in enumerate(val_loader):
                inputs, labels = data
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                batch_loss = criterion(outputs, labels) 
                _, val_pred = torch.max(outputs, 1) 
            
                val_acc += (val_pred.cpu() == labels.cpu()).sum().item() # get the index of the class with the highest probability
                val_loss += batch_loss.item()

            #打印当前训练和验证指标
            print('[{:03d}/{:03d}] Train Acc: {:3.6f} Loss: {:3.6f} | Val Acc: {:3.6f} loss: {:3.6f}'.format(
                epoch + 1, num_epoch, train_acc/len(train_set), train_loss/len(train_loader), val_acc/len(val_set), val_loss/len(val_loader)
            ))

            # if the model improves, save a checkpoint at this epoch
            #保存最佳模型
            if val_acc > best_acc:
                best_acc = val_acc
                torch.save(model.state_dict(), model_path)
                print('saving model with acc {:.3f}'.format(best_acc/len(val_set)))
    else:
        print('[{:03d}/{:03d}] Train Acc: {:3.6f} Loss: {:3.6f}'.format(
            epoch + 1, num_epoch, train_acc/len(train_set), train_loss/len(train_loader)
        ))

# if not validating, save the last epoch
if len(val_set) == 0:
    torch.save(model.state_dict(), model_path)
    print('saving model at last epoch')


In [None]:
# create testing dataset
test_set = TIMITDataset(test, None)
test_loader = DataLoader(test_set, batch_size=BATCH_SIZE, shuffle=False)

# create model and load weights from checkpoint
model = Classifier().to(device)
model.load_state_dict(torch.load(model_path))

In [None]:
predict = []
model.eval() # set the model to evaluation mode
with torch.no_grad():
    for i, data in enumerate(test_loader):
        inputs = data
        inputs = inputs.to(device)
        outputs = model(inputs)
        _, test_pred = torch.max(outputs, 1) # get the index of the class with the highest probability

        for y in test_pred.cpu().numpy():
            predict.append(y)

In [None]:
with open('prediction.csv', 'w') as f:
    f.write('Id,Class\n')
    for i, y in enumerate(predict):
        f.write('{},{}\n'.format(i, y))