# Self-Attention

In [2]:
import os
import json
import torch
import random
from pathlib import Path
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_packed_sequence #PyTorch中处理变长序列的重要函数

In [38]:
class myDataset(Dataset):
    def __init__(self, data_dir, segment_len=128):
        # 初始化
        self.data_dir = data_dir
        self.segment_len = segment_len

        # 加载json格式的映射文件（说话人到ID的映射关系）
        mapping_path = Path(data_dir) / "mapping.json"
        mapping = json.load(mapping_path.open())
        self.speaker2id = mapping["speaker2id"]

        # 从metadata.json 文件中加载训练数据的说话人的元数据
        metadata_path = Path(data_dir) / "metadata.json"
        metadata = json.load(open(metadata_path))["speakers"]

        self.speaker_num = len(metadata.keys()) # 说话人的总数
        self.data = [] # 初始化数据容器
        # 遍历每个说话人及其所有utterances（语言片段）
        for speaker in metadata.keys(): 
            for utterances in metadata[speaker]:
                # 存储：[特征路径, 说话人ID]
                self.data.append([utterances["feature_path"], self.speaker2id[speaker]])

    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        feat_path, speaker = self.data[index]
        
        mel = torch.load(os.path.join(self.data_dir, feat_path)) # 加载预处理好的梅尔频谱数据

        # 检查梅尔频谱长度是否大于目标分段长度
        if len(mel) > self.segment_len:
            start = random.randint(0, len(mel) - self.segment_len) # 随机选取起始点
            mel = torch.FloatTensor(mel[start:start+self.segment_len]) # 截取segment_len长度的片段
        else:
            mel = torch.FloatTensor(mel) #若长度不足,直接转换为张量
            # 注:长度不足segment_len时直接转换,可能导致模型报错,需要填充短序列至segment_len

        speaker = torch.FloatTensor([speaker]).long() # 将说话人ID转化为PyTorch的长整型张量

        return mel, speaker
    
    def get_speaker_number(self):
        return self.speaker_num



In [27]:
import torch
from torch.utils.data import DataLoader, random_split # random_split随机分割
from torch.nn.utils.rnn import pad_sequence # pad_sequence序列填充

In [28]:
def collate_batch(batch):
    """处理一个批次的数据"""
    mel, speaker = zip(*batch) # 解压批次数据

    mel = pad_sequence(mel, batch_first=True, padding_value=-20) # 填充梅尔频谱(用-20填充短序列)

    return mel, torch.FloatTensor(speaker).long() # 处理说话人标签

In [29]:
def get_dataloader(data_dir, batch_size, n_workers):
    dataset = myDataset(data_dir)
    speaker_num = dataset.get_speaker_number()

    trainlen = int(0.9 * len(dataset)) # 样本数不为整数向下取整, 剩余部分分配给验证集
    lengths = [trainlen, len(dataset) - trainlen]
    trainset, validset = random_split(dataset, lengths)

    train_loader = DataLoader(
        trainset,
        batch_size=batch_size, # 每批次样本数
        shuffle=True,  # 是否打乱数据顺序
        drop_last=True,
        num_workers=n_workers, # 并行加载数据传输
        pin_memory=True, # 是否锁页内存
        collate_fn=collate_batch, # 自定义批处理函数
    )
    valid_loader = DataLoader(
        validset,
        batch_size=batch_size,
        num_workers=n_workers, 
        drop_last=True,
        pin_memory=True,
        collate_fn=collate_batch,
    )

    return train_loader, valid_loader, speaker_num

In [30]:
import torch
import torch.nn as nn
import torch.nn.functional as F

In [31]:
class Classifier(nn.Module):
    
    def __init__(self, d_model=80, n_spk=600, dropout=0.1):
        """
            说话人分类器模型
            Args:
                d_model:模型特征维度
                n_spk:说话人数量
                dropout: Dropout率
        """
        # 前置网咯
        super().__init__()
        
        # 输入: (batch, seq_len, 40) -> 输出: (batch, seq_len, d_model)
        self.prenet = nn.Linear(40, d_model) # 将梅尔频谱特征投影到模型维度

        # Transformer编码层
        # nhead: 多头注意力头数 dim_feadforward:前馈网络隐藏层维度
        self.encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, dim_feedforward=256, nhead=2
        )

        # 分类预测层
        # 结构：线性层 -> ReLU -> 线性层
        # 输入: (batch, d_model) -> 输出: (batch, n_spk)
        self.pred_layer = nn.Sequential(
            nn.Linear(d_model, d_model),
            nn.ReLU(),
            nn.Linear(d_model, n_spk),
        )

    def forward(self, mels):

        # 1. 特征投影
        # (batch, seq_len, 40) -> (batch, seq_len, d_model)
        out = self.prenet(mels)
        
        # 2. 调整维度顺序以适应Transformer输入要求
        # (batch, seq_len, d_model) -> (seq_len, batch, d_model)
        out = out.permute(1, 0, 2)
        
        # 3. Transformer特征编码
        # 输入输出形状保持: (seq_len, batch, d_model)
        out = self.encoder_layer(out)
        
        # 4. 恢复原始维度顺序
        # (seq_len, batch, d_model) -> (batch, seq_len, d_model)
        out = out.transpose(0, 1)
        
        # 5. 时序平均池化（统计聚合）
        # (batch, seq_len, d_model) -> (batch, d_model)
        stats = out.mean(dim=1)
        
        # 6. 分类预测
        # (batch, d_model) -> (batch, n_spk)
        out = self.pred_layer(stats)
        return out

In [32]:
print(Classifier())

Classifier(
  (prenet): Linear(in_features=40, out_features=80, bias=True)
  (encoder_layer): TransformerEncoderLayer(
    (self_attn): MultiheadAttention(
      (out_proj): NonDynamicallyQuantizableLinear(in_features=80, out_features=80, bias=True)
    )
    (linear1): Linear(in_features=80, out_features=256, bias=True)
    (dropout): Dropout(p=0.1, inplace=False)
    (linear2): Linear(in_features=256, out_features=80, bias=True)
    (norm1): LayerNorm((80,), eps=1e-05, elementwise_affine=True)
    (norm2): LayerNorm((80,), eps=1e-05, elementwise_affine=True)
    (dropout1): Dropout(p=0.1, inplace=False)
    (dropout2): Dropout(p=0.1, inplace=False)
  )
  (pred_layer): Sequential(
    (0): Linear(in_features=80, out_features=80, bias=True)
    (1): ReLU()
    (2): Linear(in_features=80, out_features=600, bias=True)
  )
)


In [33]:
import math

import torch
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LambdaLR

In [34]:
# 带预热的余弦学习率调度器
def get_cosine_schedule_with_warmup(
        optimizer: Optimizer, # 要优化的目标
        num_warmup_steps: int, # 预热步数
        num_training_steps: int, # 总训练步数
        num_cycles: float = 0.5, # 余弦周期数
        last_epoch: int = -1, # 恢复训练时的epoch标记
):

    def  lr_lambda(current_step):
        # 1. 预热阶段:线性增长
        if current_step < num_warmup_steps:
            return float(current_step) / float(max(1, num_warmup_steps))
        
        # 2. 计算退火进度
        progress = float(current_step - num_warmup_steps) / float(max(1, num_training_steps - num_warmup_steps))

        # 3. 余弦退火计算
        return max(0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress)))
    return LambdaLR(optimizer, lr_lambda, last_epoch)

In [35]:
import torch
def model_fn(batch, model, criterion, device):
    """执行单批次的前向传播和指标计算
    Args:
        batch: 包含(mels, labels)的元组
        model: 神经网络模型
        criterion: 损失函数
        device: 计算设备(cpu/cuda)
    Returns:
        loss: 当前批次的损失值
        accuracy: 当前批次的准确率
    """
    # 1. 数据准备
    mels, labels = batch
    mels = mels.to(device)      # 梅尔频谱特征 -> GPU
    labels = labels.to(device)  # 说话人标签 -> GPU

    # 2. 前向传播
    outs = model(mels)          # 模型预测输出

    # 3. 损失计算
    loss = criterion(outs, labels)

    # 4. 指标计算
    with torch.no_grad():       # 禁用梯度以节省内存
        preds = outs.argmax(dim=1)           # 获取预测类别
        accuracy = (preds == labels).float().mean()  # 计算准确率

    return loss, accuracy

In [36]:
from tqdm import tqdm
import torch

def valid(dataloader, model, criterion, device):
    # 1. 设置模型为评估模式
    model.eval()
    
    # 2. 初始化累计指标
    running_loss = 0.0
    running_accuracy = 0.0
    
    # 3. 创建进度条
    pbar = tqdm(total=len(dataloader.dataset), ncols=0, desc="Valid", unit=" uttr")

    # 4. 批次循环
    for i, batch in enumerate(dataloader):
        # 禁用自动求导
        with torch.no_grad():
            loss, accuracy = model_fn(batch, model, criterion, device)
        
        # 累计指标
        running_loss += loss.item()
        running_accuracy += accuracy.item()

        # 更新进度条
        pbar.update(dataloader.batch_size)
        pbar.set_postfix(
            loss=f"{running_loss / (i+1):.2f}",
            accuracy=f"{running_accuracy / (i+1):.2f}",
        )

    # 5. 清理和返回
    pbar.close()
    model.train()  # 恢复训练模式
    return running_accuracy / len(dataloader)

In [None]:
from tqdm import tqdm

import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import DataLoader, random_split


def parse_args():
  """arguments"""
  config = {
    "data_dir": "../../Data/ml2021spring-hw4/Dataset",
    "save_path": "model.ckpt",
    "batch_size": 32,
    "n_workers": 8,
    "valid_steps": 2000,
    "warmup_steps": 1000,
    "save_steps": 10000,
    "total_steps": 70000,
  }

  return config


def main(
  data_dir,
  save_path,
  batch_size,
  n_workers,
  valid_steps,
  warmup_steps,
  total_steps,
  save_steps,
):
  """Main function."""
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  print(f"[Info]: Use {device} now!")

  train_loader, valid_loader, speaker_num = get_dataloader(data_dir, batch_size, n_workers)
  train_iterator = iter(train_loader)
  print(f"[Info]: Finish loading data!",flush = True)

  model = Classifier(n_spks=speaker_num).to(device)
  criterion = nn.CrossEntropyLoss()
  optimizer = AdamW(model.parameters(), lr=1e-3)
  scheduler = get_cosine_schedule_with_warmup(optimizer, warmup_steps, total_steps)
  print(f"[Info]: Finish creating model!",flush = True)

  best_accuracy = -1.0
  best_state_dict = None

  pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")

  for step in range(total_steps):
    # Get data
    try:
      batch = next(train_iterator)
    except StopIteration:
      train_iterator = iter(train_loader)
      batch = next(train_iterator)

    loss, accuracy = model_fn(batch, model, criterion, device)
    batch_loss = loss.item()
    batch_accuracy = accuracy.item()

    # Updata model
    loss.backward()
    optimizer.step()
    scheduler.step()
    optimizer.zero_grad()
    
    # Log
    pbar.update()
    pbar.set_postfix(
      loss=f"{batch_loss:.2f}",
      accuracy=f"{batch_accuracy:.2f}",
      step=step + 1,
    )

    # Do validation
    if (step + 1) % valid_steps == 0:
      pbar.close()

      valid_accuracy = valid(valid_loader, model, criterion, device)

      # keep the best model
      if valid_accuracy > best_accuracy:
        best_accuracy = valid_accuracy
        best_state_dict = model.state_dict()

      pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")

    # Save the best model so far.
    if (step + 1) % save_steps == 0 and best_state_dict is not None:
      torch.save(best_state_dict, save_path)
      pbar.write(f"Step {step + 1}, best model saved. (accuracy={best_accuracy:.4f})")

  pbar.close()


if __name__ == "__main__":
  main(**parse_args())


[Info]: Use cuda now!
