In [2]:
import os
import time
import pandas as pd
import librosa
import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import DataLoader, Dataset
from tqdm import tqdm
from torch.nn.utils.rnn import pad_sequence
from IPython.display import clear_output, Audio

In [3]:
# ---- 数据集加载和特征提取 ----

class AudioDataset(Dataset):
    def __init__(self, dataframe, data_dir, n_mfcc=13):
        """
        Args:
            dataframe (pd.DataFrame): df contains file names and labels
            data_dir (str): data directory containing audio files
            n_mfcc (int): MFCC features count
        """
        self.dataframe = dataframe
        self.data_dir = data_dir
        self.n_mfcc = n_mfcc

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        """
        Returns:
            features (torch.Tensor): (seq_len, feature_dim)
            label (int): int
        """
        row = self.dataframe.iloc[idx]
        file_name = row['filename']
        label = row['class']  # class 列是类别名 ('dog', 'cat', 'bird')
        
        # 音频路径
        file_path = os.path.join(self.data_dir, file_name)
        
        # 加载音频并提取时序特征 (MFCC)
        y, sr = librosa.load(file_path, sr=None)
        
        # 提取 MFCC 特征
        mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=self.n_mfcc)
        mfcc = torch.tensor(mfcc.T, dtype=torch.float32)  # 转置为 (seq_len, feature_dim)
        
        # 将类别字符串映射为索引
        label_mapping = {'cat': 0, 'dog': 1, 'bird': 2}
        label = label_mapping[label]

        return mfcc, label


# 由于不同音频长度不同，需要使用自定义的 collate_fn 来将同一个 batch 中的音频 padding 到其中的最大长度
def collate_fn(batch):
    """
    for padding data with different lengths

    Args:
        batch (tuple): (features, label)

    Returns:
        padded_batch_features (torch.Tensor): (batch_size, max_seq_len, feature_dim)
        batch_labels (torch.Tensor): (batch_size,)
    """
    features = [item[0] for item in batch]  # 提取每个样本的 features
    labels = [item[1] for item in batch]    # 提取每个样本的 label

    # 对不定长序列进行 padding，填充后 shape: (batch_size, max_seq_len, feature_dim)
    padded_features = pad_sequence(features, batch_first=True)
    labels = torch.tensor(labels, dtype=torch.long)             # 转为 Tensor

    return padded_features, labels

In [15]:
# 加载训练集和测试集路径
dataset_dir = "data"
train_csv_path = f"{dataset_dir}/train.csv"
test_csv_path = f"{dataset_dir}/test.csv"
train_dir = f"{dataset_dir}/train"
test_dir = f"{dataset_dir}/test"

train_df = pd.read_csv(train_csv_path)
test_df = pd.read_csv(test_csv_path)

# 创建数据集
train_dataset = AudioDataset(train_df, train_dir)
test_dataset = AudioDataset(test_df, test_dir)

# 数据加载器
batch_size = 16 # Tips: 你可以根据显存来自己调整来获得最佳性能
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False, collate_fn=collate_fn)

In [16]:
# 查看下数据集，其中 class 表示类别，数据集中的类别有 cat, dog, bird 总共 3 类
train_df.head()

Unnamed: 0,filename,class
0,000.wav,bird
1,001.wav,cat
2,002.wav,bird
3,003.wav,dog
4,004.wav,bird


In [17]:
# 可以试听一下数据集中的音频文件
Audio(f"{train_dir}/002.wav")

In [8]:
# 查看第一个 batch 的数据形状
for i, (features, labels) in enumerate(train_loader):
    print(f"Batch {i+1}:")
    print(f"Features shape: {features.shape}")
    print(f"Labels shape: {labels.shape}")
    if i >= 2:
        break

Batch 1:
Features shape: torch.Size([16, 32, 13])
Labels shape: torch.Size([16])
Batch 2:
Features shape: torch.Size([16, 32, 13])
Labels shape: torch.Size([16])
Batch 3:
Features shape: torch.Size([16, 32, 13])
Labels shape: torch.Size([16])


In [9]:
# 使用自定义的 RNN 模型
class CustomRNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        """
        RNN model initialization

        Args:
            input_size: feature dim
            hidden_size: hidden state dim
            output_size: output dim
        """
        super(CustomRNN, self).__init__()
        
        # 参数定义
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        
        # RNN 权重参数

        self.W_xh = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_hh = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.b_h = nn.Parameter(torch.zeros(hidden_size))
        
        # 输出权重参数
        self.W_ho = nn.Parameter(torch.randn(hidden_size, output_size))
        self.b_o = nn.Parameter(torch.zeros(output_size))
        
        # Xavier uniform 初始化权重
        nn.init.xavier_uniform_(self.W_xh)
        nn.init.xavier_uniform_(self.W_hh)
        nn.init.xavier_uniform_(self.W_ho)
        

    def forward(self, x, hidden_state=None):
        """
        Args:  
            x: (seq_len, batch_size, input_size)
            hidden_state: (batch_size, hidden_size) 

        Returns:  
            output_seq: (seq_len, batch_size, output_size)
        """
        seq_len, batch_size, _ = x.shape
        
        if hidden_state is None:  
            hidden_state = torch.zeros(batch_size, self.hidden_size).to(x.device)  # 初始化隐状态为零

        # 遍历时间步
        for t in range(seq_len):
            
            hidden_state = torch.tanh(x[t] @ self.W_xh + hidden_state @ self.W_hh + self.b_h)
        
        logits = hidden_state @ self.W_ho + self.b_o
        return logits

In [10]:
# 超参数设置
# Tips: 你可以自行调整来获得最佳性能
input_size = 13      # MFCC 特征数
hidden_size = 32     # 隐状态维度
output_size = 3      # 输出类别数
learning_rate = 0.005
num_epochs = 50

device = torch.device('cuda' if torch.cuda.is_available() else 'mps')
device

device(type='mps')

In [11]:
# 模型初始化
model = CustomRNN(input_size, hidden_size, output_size).to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

In [12]:
# 训练
losses = []

time_start = time.time()
for epoch in range(num_epochs):
    model.train()
    total_loss = 0
    for features, labels in train_loader:
        features, labels = features.to(device), labels.to(device)
        # 转换为 RNN 的输入格式 (seq_len, batch_size, input_size)
        features = features.permute(1, 0, 2)
        logits = model(features)
        loss = criterion(logits, labels)

        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    losses.append(total_loss)

    # 每个epoch刷新loss曲线
    clear_output(wait=True)
    plt.figure(figsize=(8, 5))
    plt.plot(range(epoch + 1), losses, marker='.')
    plt.title("Training Loss per Epoch")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.xlim(0, num_epochs)
    plt.grid()
    plt.show()

time_end = time.time()

# 输出最后的loss
print(f"Final Loss: {total_loss:.4f}")
print(f"Training Time: {time_end - time_start:.2f} seconds, {(time_end - time_start) / epoch:.2f} s/epoch")

Final Loss: 32.3144
Training Time: 44.73 seconds, 0.91 s/epoch


In [14]:
# 评估
model.eval()
correct = 0
total = 0
with torch.no_grad():
    for features, labels in test_loader:
        features, labels = features.to(device), labels.to(device)
        features = features.permute(1, 0, 2)
        logits = model(features)
        _,predicted = torch.max(logits.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test Accuracy: {100 * correct / total:.2f}%")

Test Accuracy: 34.15%


In [19]:
class CustomLSTM(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        """
        LSTM model initialization
        
        Args:
            input_size: feature dim
            hidden_size: hidden state dim
            output_size: output dim
        """
        super(CustomLSTM, self).__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size

        # 初始化 LSTM 的权重参数
        # 遗忘门
        self.W_xf = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_hf = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.b_f = nn.Parameter(torch.zeros(hidden_size))

        # 输入门
        self.W_xi = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_hi = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.b_i = nn.Parameter(torch.zeros(hidden_size))

        # 候选记忆单元
        self.W_xc = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_hc = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.b_c = nn.Parameter(torch.zeros(hidden_size))

        # 输出门
        self.W_xo = nn.Parameter(torch.randn(input_size, hidden_size))
        self.W_ho = nn.Parameter(torch.randn(hidden_size, hidden_size))
        self.b_o = nn.Parameter(torch.zeros(hidden_size))

        # 输出层参数
        self.fc = nn.Linear(in_features=hidden_size, out_features=output_size)

        # 初始化参数
        self.init_weights()

    def init_weights(self):
        """
        Apply Xavier uniform initialization to the weights of the LSTM.
        """
        for param in self.parameters():
            if param.dim() > 1:  # 对权重矩阵进行初始化
                nn.init.xavier_uniform_(param)

    def forward(self, x):
        """
        Args:
            x: (seq_len, batch_size, input_size)
        
        Returns:
            logits: (batch_size, output_size)
        """
        seq_len, batch_size, _ = x.shape

        # 初始化隐状态和细胞状态
        h_t = torch.zeros(batch_size, self.hidden_size).to(x.device)  # 初始隐状态
        c_t = torch.zeros(batch_size, self.hidden_size).to(x.device)  # 初始细胞状态

        # 遍历时间步
        for t in range(seq_len):
            # 遗忘门
            f_t = torch.sigmoid(x[t] @ self.W_xf + h_t @ self.W_hf + self.b_f)

            # 输入门
            i_t = torch.sigmoid(x[t] @ self.W_xi + h_t @ self.W_hi + self.b_i)

            # 候选单元
            c_hat_t = torch.tanh(x[t] @ self.W_xc + h_t @ self.W_hc + self.b_c)

            # 更新细胞状态
            c_t = f_t * c_t + i_t * c_hat_t

            # 输出门
            o_t = torch.sigmoid(x[t] @ self.W_xo + h_t @ self.W_ho + self.b_o)

            # 更新隐状态
            h_t = o_t * torch.tanh(c_t)

        # 最后时间步的隐状态输入输出层
        logits = self.fc(h_t)  # (batch_size, output_size)
        return logits

In [20]:
# 模型初始化
lstm_model = CustomLSTM(input_size, hidden_size, output_size).to(device)
criterion = nn.CrossEntropyLoss().to(device)
optimizer = torch.optim.Adam(lstm_model.parameters(), lr=learning_rate)

In [21]:
# 训练
losses = []

time_start = time.time()
for epoch in range(num_epochs):
    lstm_model.train()
    total_loss = 0
    for features, labels in train_loader:
        features, labels = features.to(device), labels.to(device)
        # 转换为 RNN 的输入格式 (seq_len, batch_size, input_size)
        features = features.permute(1, 0, 2)
        logits = lstm_model(features)
        loss = criterion(logits, labels)

        # 反向传播和优化
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    losses.append(total_loss)

    # 每个epoch刷新loss曲线
    clear_output(wait=True)
    plt.figure(figsize=(8, 5))
    plt.plot(range(epoch + 1), losses, marker='.')
    plt.title("Training Loss per Epoch")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.xlim(0, num_epochs)
    plt.grid()
    plt.show()

time_end = time.time()

# 输出最后的loss
print(f"Final Loss: {total_loss:.4f}")
print(f"Training Time: {time_end - time_start:.2f} seconds, {(time_end - time_start) / epoch:.2f} s/epoch")

Final Loss: 4.1891
Training Time: 86.65 seconds, 1.77 s/epoch


In [23]:
# 评估
lstm_model.eval()
correct = 0
total = 0
with torch.no_grad():
    for features, labels in test_loader:
        features, labels = features.to(device), labels.to(device)
        features = features.permute(1, 0, 2)
        logits = lstm_model(features)
        _,predicted = torch.max(logits.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

print(f"Test Accuracy: {100 * correct / total:.2f}%")

Test Accuracy: 89.43%


使用 LSTM 后，在训练阶段能观测到 loss 下降更为迅速，在训练 50 个 epoch 后 loss 下降到一个较低的水平（一般在 5.0 以下），并且在测试集上准确率能够达到 85%-95%，这表明 LSTM 模型学会了如何进行音频分类。