In [1]:
import imp
import os
import json
import torch
import random
from pathlib import Path
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence


In [2]:
class myDataset(Dataset):
    def __init__(self,data_dir,segment_len=128) -> None:
        self.data_dir = data_dir
        self.segment_len = segment_len
        mapping_path = Path(data_dir)/'mapping.json'
        mapping = json.load(mapping_path.open())
        self.speaker2id = mapping['speaker2id']
        metadata_path = Path(data_dir)/'metadata.json'
        metadata = json.load(open(metadata_path))['speakers']
        self.speaker_num = len(metadata.keys())  # 获取说话人的数量，即声音的总类数
        self.data = list()
        for speaker in metadata.keys():
            for utt in metadata[speaker]:
                self.data.append([utt['feature_path'],self.speaker2id[speaker]])
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, index):
        feature_path, speaker = self.data[index]
        mel = torch.load(os.path.join(self.data_dir,feature_path))
        if len(mel) > self.segment_len:
            start = random.randint(0,len(mel)-self.segment_len)
            mel = mel[start:start+self.segment_len]
            mel = torch.FloatTensor(mel)
        else:
            mel = torch.FloatTensor(mel)
        speaker = torch.FloatTensor([speaker]).long()
        return mel,speaker
    
    def get_speaker_number(self):
        return self.speaker_num

In [3]:
from torch.utils.data import DataLoader,random_split
def collate_batch(batch):
    mel, speaker = zip(*batch)
    # 作用接受targets 2层嵌套list, 然后将其它list补齐大最大list, 补齐的值为padding_value
    mel = pad_sequence(mel, batch_first=True, padding_value=-20)
    # mel: (batch size, length, 40)
    return mel, torch.FloatTensor(speaker).long()


def get_dataloader(data_dir, batch_size, n_workers):
  """本函数用于生成 dataloader"""
  dataset = myDataset(data_dir)
  speaker_num = dataset.get_speaker_number()
  # 将数据集划分为验证集和训练集
  trainlen = int(0.9 * len(dataset))  # 训练集的占总数据集的10%
  lengths = [trainlen, len(dataset) - trainlen]
  trainset, validset = random_split(dataset, lengths)  # 划分完毕训练集和验证集

  train_loader = DataLoader(
      trainset,
      batch_size=batch_size,
      shuffle=True,
      drop_last=True,
      num_workers=n_workers,
      pin_memory=True,
      collate_fn=collate_batch,
  )
  valid_loader = DataLoader(
      validset,
      batch_size=batch_size,
      num_workers=n_workers,
      drop_last=True,
      pin_memory=True,
      collate_fn=collate_batch,
  )

  return train_loader, valid_loader, speaker_num


In [4]:
get_dataloader('Data', 64, 0)


(<torch.utils.data.dataloader.DataLoader at 0x183babbcfd0>,
 <torch.utils.data.dataloader.DataLoader at 0x183bb8cb040>,
 600)

In [5]:
d = myDataset('Data')
d[0][0].shape

torch.Size([128, 40])

In [6]:
import torch
import torch.nn as nn
import torch.nn.functional as F
class Classifier(nn.Module):
    def __init__(self,d_model=80,n_spks=600,dropout=0.1):
        """
        d_model:输入TransformerEncoderLayer层的的特征维数
        """
        super().__init__()
        self.prenet = nn.Linear(40,d_model)  # 输入的数据维数就是[batch_size,128,40]
        self.encoder_layer = nn.TransformerDecoderLayer(d_model=d_model,dim_feedforward=256,nhead=2)  # dim_feedforward:feedforward层的维度数, nhead:multi-head情况下的head数
        self.pred_layer = nn.Sequential(
            nn.Linear(d_model,d_model),
            nn.ReLU(),
            nn.Linear(d_model,n_spks)
        )
    def forward(self, mels):
        """
        args:
        mels: (batch size, length, 40)
        return:
        out: (batch size, n_spks)
        """
        # out: (batch size, length, d_model)
        out = self.prenet(mels)
        # out: (length, batch size, d_model)
        out = out.permute(1, 0, 2)  # permute函数用于维度互换，此处是将0维度和1维度互换
        # 因为encoder希望的输入形状应该是 (length, batch size, d_model).
        out = self.encoder_layer(out)  # 此处encoder输出的结果维度是(batch size, length, d_model)
        out = out.transpose(0, 1)  # 第0维度和第1维度进行转置
        # mean pooling
        stats = out.mean(dim=1)

        # out: (batch, n_spks)
        out = self.pred_layer(stats)
        return out

In [7]:
"""
前人研究表明:学习率的warm-up机制对训练Transfomer的架构的模型非常有用。
warm-up机制的步骤:
(1)在开始的时候设学习率为0.
(2)在warm-up阶段中,学习率从0线性的逐渐增加到预先设定好的阈值.
"""
import math
import torch
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LambdaLR
def get_cosine_schedule_with_warmup(optimizer,num_warmup_steps,num_training_steps,num_cycles=0.5,last_epoch=-1):
    """
    optimizer:优化器
    num_warmup_steps:在大概多少个step后warmup阶段结束(代表梯度已经增长到了设定的阈值)
    num_training_steps:
    num_cycles:
    last_epocj:
    """
    def lr_lambda(current_step):
        if current_step < num_warmup_steps:
            return float(current_step) / float(max(1,num_warmup_steps))
        progress = float(current_step - num_warmup_steps) / float(max(1,num_training_steps-num_warmup_steps))
        return max(
      0.0, 0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress))
    )

    # 设置学习率为初始学习率乘以给定lr_lambda函数的值
    return LambdaLR(optimizer, lr_lambda, last_epoch)


In [8]:
"""接下来写一些model训练相关的函数"""
def model_fn(batch,model,criterion,device):
    mels,labels = batch
    mels = mels.to(device)
    labels = labels.to(device)
    outs = model(mels)
    loss = criterion(outs,labels)
    preds = outs.argmax(1)  # 拿到预测结果
    accuracy = torch.mean((preds==labels).float())
    return loss,accuracy
from tqdm import tqdm


def valid(dataloader, model, criterion, device):
  """Validate on validation set."""

  model.eval()
  running_loss = 0.0
  running_accuracy = 0.0
  pbar = tqdm(total=len(dataloader.dataset),
              ncols=0, desc="Valid", unit=" uttr")

  for i, batch in enumerate(dataloader):
    with torch.no_grad():
      loss, accuracy = model_fn(batch, model, criterion, device)
      running_loss += loss.item()
      running_accuracy += accuracy.item()

    pbar.update(dataloader.batch_size)
    pbar.set_postfix(
        loss=f"{running_loss / (i+1):.2f}",
        accuracy=f"{running_accuracy / (i+1):.2f}",
    )

  pbar.close()
  model.train()

  return running_accuracy / len(dataloader)


In [9]:
from tqdm import tqdm

import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import DataLoader, random_split


def parse_args():
  """arguments"""
  config = {
      "data_dir": "Data",
      "save_path": "model.ckpt",
      "batch_size": 32,
      "n_workers": 8,
      "valid_steps": 2000,
      "warmup_steps": 1000,
      "save_steps": 10000,
      "total_steps": 70000,
  }

  return config


def main(
    data_dir,
    save_path,
    batch_size,
    n_workers,
    valid_steps,
    warmup_steps,
    total_steps,
    save_steps,
):
  """Main function."""
  device = torch.device("cuda" if torch.cuda.is_available() else "cpu")  # 检测机器
  print(f"[Info]: 现在使用 {device}")

  train_loader, valid_loader, speaker_num = get_dataloader(
      data_dir, batch_size, n_workers)
  train_iterator = iter(train_loader)
  print("数据载入完毕")

  model = Classifier(n_spks=speaker_num).to(device)
  criterion = nn.CrossEntropyLoss()
  optimizer = AdamW(model.parameters(), lr=1e-3)
  scheduler = get_cosine_schedule_with_warmup(
      optimizer, warmup_steps, total_steps)
  print("模型创建完毕")

  best_accuracy = -1.0
  best_state_dict = None

  pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")

  for step in range(total_steps):
    # Get data
    try:
      batch = next(train_iterator)
    except StopIteration:
      train_iterator = iter(train_loader)
      batch = next(train_iterator)

    loss, accuracy = model_fn(batch, model, criterion, device)
    batch_loss = loss.item()
    batch_accuracy = accuracy.item()

    # Updata model
    loss.backward()
    optimizer.step()
    scheduler.step()
    optimizer.zero_grad()

    # Log
    pbar.update()
    pbar.set_postfix(
        loss=f"{batch_loss:.2f}",
        accuracy=f"{batch_accuracy:.2f}",
        step=step + 1,
    )

    # Do validation
    if (step + 1) % valid_steps == 0:
      pbar.close()

      valid_accuracy = valid(valid_loader, model, criterion, device)

      # keep the best model
      if valid_accuracy > best_accuracy:
        best_accuracy = valid_accuracy
        best_state_dict = model.state_dict()

      pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")

    # Save the best model so far.
    if (step + 1) % save_steps == 0 and best_state_dict is not None:
      torch.save(best_state_dict, save_path)
      pbar.write(
          f"Step {step + 1}, best model saved. (accuracy={best_accuracy:.4f})")

  pbar.close()


if __name__ == "__main__":
  main(**parse_args())


[Info]: 现在使用 cuda
