# Task description
- Classify the speakers of given features.
- Main goal: Learn how to use transformer.
- Baselines:
  - Easy: Run sample code and know how to use transformer.
  - Medium: Know how to adjust parameters of transformer.
  - Strong: Construct [conformer](https://arxiv.org/abs/2005.08100) which is a variety of transformer.
  - Boss: Implement [Self-Attention Pooling](https://arxiv.org/pdf/2008.01077v1.pdf) & [Additive Margin Softmax](https://arxiv.org/pdf/1801.05599.pdf) to further boost the performance.

- Other links
  - Competiton: [link](https://www.kaggle.com/t/49ea0c385a974db5919ec67299ba2e6b)
  - Slide: [link](https://docs.google.com/presentation/d/1LDAW0GGrC9B6D7dlNdYzQL6D60-iKgFr/edit?usp=sharing&ouid=104280564485377739218&rtpof=true&sd=true)
  - Data: [link](https://github.com/googly-mingto/ML2023HW4/releases)



In [None]:
!wget https://github.com/googly-mingto/ML2023HW4/releases/download/data/Dataset.tar.gz.partaa
!wget https://github.com/googly-mingto/ML2023HW4/releases/download/data/Dataset.tar.gz.partab
!wget https://github.com/googly-mingto/ML2023HW4/releases/download/data/Dataset.tar.gz.partac
!wget https://github.com/googly-mingto/ML2023HW4/releases/download/data/Dataset.tar.gz.partad

!cat Dataset.tar.gz.part* > Dataset.tar.gz
!rm Dataset.tar.gz.partaa
!rm Dataset.tar.gz.partab
!rm Dataset.tar.gz.partac
!rm Dataset.tar.gz.partad
# unzip the file
!tar zxf Dataset.tar.gz
!rm Dataset.tar.gz

In [None]:
!tar zxf Dataset.tar.gz

In [None]:
# 导入必要的库
import numpy as np
import torch
import random

def set_seed(seed):
    """
    设置随机种子，以确保实验的可复现性。
    
    参数:
    seed (int): 随机种子值。
    
    该函数通过设置numpy、Python内置随机模块、PyTorch的随机种子，以及对CUDA设备的随机种子进行设置，来确保随机数生成的一致性。
    此外，还配置了PyTorch的cudnn行为，以确保在使用CUDA时也具有可复现性。
    """
    # 设置numpy的随机种子，保证numpy相关的随机操作可复现
    np.random.seed(seed)
    # 设置Python内置随机模块的随机种子，使得基于random的随机操作结果一致
    random.seed(seed)
    # 设置PyTorch的随机种子，确保张量操作等的随机性可控
    torch.manual_seed(seed)
    # 如果CUDA可用，设置CUDA的随机种子
    if torch.cuda.is_available():
        torch.cuda.manual_seed(seed)
        torch.cuda.manual_seed_all(seed)
    # 禁用cudnn自动寻找最适合当前硬件配置的卷积算法，这有助于结果的复现性
    torch.backends.cudnn.benchmark = False
    # 设置cudnn为确定性模式
    torch.backends.cudnn.deterministic = True

# 设置随机种子以确保实验可复现性
set_seed(87)


# Data

## Dataset
- Original dataset is [Voxceleb2](https://www.robots.ox.ac.uk/~vgg/data/voxceleb/vox2.html).
- The [license](https://creativecommons.org/licenses/by/4.0/) and [complete version](https://www.robots.ox.ac.uk/~vgg/data/voxceleb/files/license.txt) of Voxceleb2.
- We randomly select 600 speakers from Voxceleb2.
- Then preprocess the raw waveforms into mel-spectrograms.

- Args:
  - data_dir: The path to the data directory.
  - metadata_path: The path to the metadata.
  - segment_len: The length of audio segment for training.
- The architecture of data directory \\
  - data directory \\
  |---- metadata.json \\
  |---- testdata.json \\
  |---- mapping.json \\
  |---- uttr-{random string}.pt \\

- The information in metadata
  - "n_mels": The dimention of mel-spectrogram.
  - "speakers": A dictionary.
    - Key: speaker ids.
    - value: "feature_path" and "mel_len"


For efficiency, we segment the mel-spectrograms into segments in the traing step.

为了提高效率，在训练步骤中，我们将梅尔频谱图切分为多个片段

In [None]:
import os
import json
import torch
import random
from pathlib import Path
from torch.utils.data import Dataset
from torch.nn.utils.rnn import pad_sequence


# 定义一个自定义数据集类myDataset，继承自PyTorch的Dataset基类
class myDataset(Dataset):
    def __init__(self, data_dir, segment_len=128):
        # 初始化方法，接收数据目录和段长度作为参数
        self.data_dir = data_dir  # 数据目录路径
        self.segment_len = segment_len  # 每个数据段的长度，默认为128

        # 加载从说话人名称到其对应ID的映射文件
        mapping_path = Path(data_dir) / "mapping.json"  # 映射文件路径
        with mapping_path.open() as f:  # 打开文件
            mapping = json.load(f)  # 解析JSON文件内容
        self.speaker2id = mapping["speaker2id"]  # 获取映射字典

        # 加载训练数据的元数据
        metadata_path = Path(data_dir) / "metadata.json"  # 元数据文件路径
        with open(metadata_path) as f:  # 打开文件
            metadata = json.load(f)["speakers"]  # 解析JSON文件内容

        # 获取说话人的总数
        self.speaker_num = len(metadata.keys())  # 计算键（说话人）的数量

        # 初始化一个空列表，用于存储数据
        self.data = []

        # 遍历所有说话人及其语音片段
        for speaker in metadata.keys():
            for utterances in metadata[speaker]:  # 遍历说话人的语音片段
                # 将特征路径和对应的说话人ID存储到数据列表中
                self.data.append([utterances["feature_path"], self.speaker2id[speaker]])



	# 返回数据集的总样本数量
	def __len__(self):
		return len(self.data)

	# 根据索引获取数据集中的一项数据
	def __getitem__(self, index):
		# 从数据列表中提取特征路径和说话人ID
		feat_path, speaker = self.data[index]
		
		# 使用torch.load加载预处理过的梅尔频谱图数据
		mel = torch.load(os.path.join(self.data_dir, feat_path))
		
		# 如果梅尔频谱图的帧数超过设定的segment_len
		if len(mel) > self.segment_len:
			# 随机选择一个起始点，以这个点开始裁剪segment_len长度的片段
			start = random.randint(0, len(mel) - self.segment_len)
			# 截取指定长度的梅尔频谱图片段
			mel = torch.FloatTensor(mel[start:start+self.segment_len])
		else:
			# 如果原始长度不足，直接转换为FloatTensor
			mel = torch.FloatTensor(mel)
			
		# 将说话人ID转换为Long类型，以便后续计算损失函数时使用
		speaker = torch.FloatTensor([speaker]).long()
		# 返回处理后的梅尔频谱图和说话人ID
		return mel, speaker

	# 获取数据集中说话人的总数
	def get_speaker_number(self):
		return self.speaker_num


## Dataloader
- Split dataset into training dataset(90%) and validation dataset(10%).
- Create dataloader to iterate the data.

In [None]:
import torch
from torch.utils.data import DataLoader, random_split
from torch.nn.utils.rnn import pad_sequence


# 定义一个函数，用于处理批量数据
def collate_batch(batch):
    """
    这个函数的作用是将一批数据（batch）进行预处理，使其适合送入模型进行训练。
    
    参数:
    batch: 一个列表，包含多个样本，每个样本由梅尔频谱图和对应的说话人ID组成。
    
    返回:
    - mel: 垂直堆叠并填充后的梅尔频谱图张量，形状为 (batch_size, max_length, 40)。
    - speaker: 转换为Long类型的说话人ID张量，形状为 (batch_size,)。
    """
    # 使用zip函数将batch中的梅尔频谱图和说话人ID分开
    mel, speaker = zip(*batch)

    # 在同一个批次中，我们需要将梅尔频谱图的长度统一，因此使用pad_sequence进行填充
    # 参数batch_first=True表示将批次维度放在第一个维度，padding_value=-20表示用非常小的值（log 10^(-20)）进行填充
    mel = pad_sequence(mel, batch_first=True, padding_value=-20)  # 填充后梅尔频谱图的形状为 (batch_size, length, 40)

    # 将说话人ID转换为Long类型，以便于模型处理
    speaker = torch.FloatTensor(speaker).long()

    # 返回处理后的梅尔频谱图和说话人ID
    return mel, speaker



# 定义一个函数，用于生成训练和验证数据加载器
def get_dataloader(data_dir, batch_size, n_workers):
    """
    该函数根据给定的参数创建训练和验证数据加载器。
    
    参数:
    - data_dir: 存储数据的目录路径。
    - batch_size: 每个批次的样本数量。
    - n_workers: 用于数据预处理的子进程数量。
    
    返回:
    - train_loader: 训练数据加载器。
    - valid_loader: 验证数据加载器。
    - speaker_num: 数据集中说话人的总数。
    """
    # 创建myDataset实例
    dataset = myDataset(data_dir)
    
    # 获取说话人的总数
    speaker_num = dataset.get_speaker_number()
    
    # 将数据集划分为训练集和验证集，比例为9:1
    trainlen = int(0.9 * len(dataset))  # 训练集的长度
    lengths = [trainlen, len(dataset) - trainlen]  # 训练集和验证集的长度列表
    trainset, validset = random_split(dataset, lengths)  # 使用随机分割
    
    # 创建训练数据加载器
    train_loader = DataLoader(
        trainset,  # 使用训练集数据
        batch_size=batch_size,  # 每个批次的大小
        shuffle=True,  # 训练时需要打乱数据顺序
        drop_last=True,  # 删除最后一个不足batch_size的小批次
        num_workers=n_workers,  # 数据预处理子进程数量
        pin_memory=True,  # 使用 pinned memory 提高性能，它避免了数据在CPU和GPU之间传输时因内存页面交换带来的额外延迟，从而加速了数据传输速度
        collate_fn=collate_batch,  # 使用之前定义的collate_batch函数
    )
    
    # 创建验证数据加载器
    valid_loader = DataLoader(
        validset,  # 使用验证集数据
        batch_size=batch_size,  # 每个批次的大小
        num_workers=n_workers,  # 数据预处理子进程数量
        drop_last=True,  # 删除最后一个不足batch_size的小批次
        pin_memory=True,  # 使用 pinned memory 提高性能
        collate_fn=collate_batch,  # 使用之前定义的collate_batch函数
    )
    
    # 返回训练和验证数据加载器以及说话人的总数
    return train_loader, valid_loader, speaker_num


# Model
- TransformerEncoderLayer:
  - Base transformer encoder layer in [Attention Is All You Need](https://arxiv.org/abs/1706.03762)
  - Parameters:
    - d_model: the number of expected features of the input (required).

    - nhead: the number of heads of the multiheadattention models (required).

    - dim_feedforward: the dimension of the feedforward network model (default=2048).

    - dropout: the dropout value (default=0.1).

    - activation: the activation function of intermediate layer, relu or gelu (default=relu).

- TransformerEncoder:
  - TransformerEncoder is a stack of N transformer encoder layers
  - Parameters:
    - encoder_layer: an instance of the TransformerEncoderLayer() class (required).

    - num_layers: the number of sub-encoder-layers in the encoder (required).

    - norm: the layer normalization component (optional).

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


# 定义一个分类器模型，用于识别说话人
class Classifier(nn.Module):
    def __init__(self, d_model=80, n_spks=600, dropout=0.1):
        # 继承自nn.Module
        super().__init__()

        # 输入特征的维度从40转换到d_model
        self.prenet = nn.Linear(40, d_model)

        # 待完成的任务：将Transformer替换为Conformer
        # 参考链接：https://arxiv.org/abs/2005.08100
        self.encoder_layer = nn.TransformerEncoderLayer(
            d_model=d_model, dim_feedforward=256, nhead=2
        )

        # 从d_model维度的特征映射到说话人数量
        self.pred_layer = nn.Sequential(
            nn.Linear(d_model, d_model),
            nn.Sigmoid(),
            nn.Linear(d_model, n_spks),
        )

    def forward(self, mels):
        """
        输入参数：
            mels: (batch size, length, 40) - 梅尔频谱图的张量

        返回：
            out: (batch size, n_spks) - 预测的说话人编号张量
        """
        # 应用prenet，将梅尔频谱图转换为d_model维度
        out = self.prenet(mels)
        
        # 转换张量的形状以适应TransformerEncoderLayer
        out = out.permute(1, 0, 2)  # (length, batch size, d_model)

        # 应用TransformerEncoderLayer
        out = self.encoder_layer(out)

        # 将形状恢复为(batch size, length, d_model)
        out = out.transpose(0, 1)

        # 对每个样本的序列进行平均池化
        stats = out.mean(dim=1)

        # 应用预测层得到说话人编号
        out = self.pred_layer(stats)

        # 返回预测的说话人编号
        return out


# Learning rate schedule
- For transformer architecture, the design of learning rate schedule is different from that of CNN.
- Previous works show that the warmup of learning rate is useful for training models with transformer architectures.
- The warmup schedule
  - Set learning rate to 0 in the beginning.
  - The learning rate increases linearly from 0 to initial learning rate during warmup period.

In [None]:
import math

import torch
from torch.optim import Optimizer
from torch.optim.lr_scheduler import LambdaLR


# 定义一个函数，用于创建带有预热阶段的余弦退火学习率调度器
def get_cosine_schedule_with_warmup(
	optimizer: Optimizer,  # 优化器实例，如Adam或SGD，用于更新模型参数
	num_warmup_steps: int,  # 预热阶段的步数，学习率线性增长至原设定值
	num_training_steps: int,  # 总训练步数，用于计算学习率衰减计划
	num_cycles: float = 0.5,  # 余弦退火周期数，默认0.5意味着半个周期从最大降到0
	last_epoch: int = -1,  # 上一次训练的epoch数，用于恢复训练状态，默认-1表示从头开始
):

	"""
	创建一个学习率调度器，其学习率遵循余弦函数的规律，从初始学习率逐渐增加到最大值，然后逐渐减小到0。
	在增加阶段有一个预热期，学习率线性增加。

	参数:
	- optimizer: torch.optim.Optimizer类型的优化器，需要调整学习率的优化器。
	- num_warmup_steps: int类型，预热阶段的步数。
	- num_training_steps: int类型，总的训练步数。
	- num_cycles: float类型，可选，默认为0.5，余弦退火周期的数量。
	- last_epoch: int类型，可选，默认为-1，恢复训练时的最后一个epoch。

	返回:
	- LambdaLR: 使用lr_lambda函数作为学习率计算规则的torch.optim.lr_scheduler.LambdaLR对象。
	"""

	# 定义一个闭包函数lr_lambda，用于计算当前步骤的学习率
	def lr_lambda(current_step):
		# 预热阶段
		if current_step < num_warmup_steps:
			# 学习率线性增加
			return current_step / max(1, num_warmup_steps)
		# 余弦退火阶段
		else:
			# 计算训练进度
			progress = (current_step - num_warmup_steps) / max(
				1, num_training_steps - num_warmup_steps
			)
			# 余弦退火公式，num_cycles控制周期次数
			return max(
				0.0,
				0.5 * (1.0 + math.cos(math.pi * float(num_cycles) * 2.0 * progress)),
			)

	# 使用lr_lambda创建LambdaLR调度器，并传入最后一个epoch
	return LambdaLR(optimizer, lr_lambda, last_epoch)


# Model Function
- Model forward function.

In [None]:
import torch


def model_fn(batch, model, criterion, device):
    """
    该函数负责将一个数据批次（batch）通过模型进行前向传播，并计算损失和准确率。
    
    参数:
    - batch: 包含音频特征（mels）和对应标签（labels）的数据批次。
    - model: 训练好的模型，用于对音频特征进行分类。
    - criterion: 损失函数，用于衡量模型预测与真实标签之间的差异。
    - device: 指定模型和数据应该在哪个设备（如CPU或GPU）上运行。
    
    返回:
    - loss: 该批次数据的平均损失值。
    - accuracy: 该批次数据的预测准确率。
    """

    # 解包批次数据，获取音频特征（mels）和标签（labels）
    mels, labels = batch

    # 将音频特征和标签转移到指定的设备上（如GPU）
    mels = mels.to(device)
    labels = labels.to(device)

    # 通过模型进行前向传播，得到预测结果
    outs = model(mels)

    # 计算损失（loss），即模型预测结果与真实标签之间的差距
    loss = criterion(outs, labels)

    # 找出每个样本预测概率最高的说话人ID
    preds = outs.argmax(1)

    # 计算准确率，即预测正确的样本数占总样本数的比例
    accuracy = torch.mean((preds == labels).float())  # .float()将布尔值转换为浮点数以便计算平均值

    # 返回损失值和准确率
    return loss, accuracy


# Validate
- Calculate accuracy of the validation set.

In [None]:
from tqdm import tqdm
import torch


def valid(dataloader, model, criterion, device):
	"""
	验证模型在验证集上的性能。

	参数:
	- dataloader: DataLoader对象，包含验证集数据。
	- model: 已训练的模型，用于进行预测。
	- criterion: 损失函数，用于计算预测与真实标签之间的差异。
	- device: 设备（如CPU或GPU），用于模型运算。

	返回:
	- avg_accuracy: 验证集上的平均准确率。
	"""

	# 将模型设置为评估模式，关闭dropout等随机操作
	model.eval()

	# 初始化运行损失和运行准确率
	running_loss = 0.0
	running_accuracy = 0.0

	# 使用tqdm创建进度条，跟踪验证过程
	pbar = tqdm(total=len(dataloader.dataset), ncols=0, desc="Valid", unit=" uttr")

	# 遍历验证集的每个批次
	for i, batch in enumerate(dataloader):
		# 在没有梯度计算的环境中运行模型，以节省内存
		with torch.no_grad():
			# 使用model_fn计算批次的损失和准确率
			loss, accuracy = model_fn(batch, model, criterion, device)
			# 更新运行损失和运行准确率
			running_loss += loss.item()
			running_accuracy += accuracy.item()

		# 更新进度条
		pbar.update(dataloader.batch_size)
		# 设置进度条的附加信息，显示当前的平均损失和准确率
		pbar.set_postfix(loss=f"{running_loss / (i+1):.2f}", accuracy=f"{running_accuracy / (i+1):.2f}")

	# 关闭进度条
	pbar.close()

	# 将模型恢复到训练模式
	model.train()

	# 返回验证集的平均准确率
	return running_accuracy / len(dataloader)


# Main function

In [None]:
from tqdm import tqdm

import torch
import torch.nn as nn
from torch.optim import AdamW
from torch.utils.data import DataLoader, random_split


def parse_args():
	"""
	解析并返回配置参数，用于控制训练流程。
	"""
	config = {
		"data_dir": "./Dataset",  # 数据集的根目录
		"save_path": "model.ckpt",  # 模型权重保存的文件名
		"batch_size": 32,  # 训练和验证时每个批次的样本数量
		"n_workers": 8,  # 数据加载时使用的子进程数，用于并行加载数据
		"valid_steps": 2000,  # 每隔多少训练步进行一次验证
		"warmup_steps": 1000,  # 学习率预热阶段的步数，从0线性增加到设定值
		"save_steps": 10000,  # 每隔多少步保存一次模型权重
		"total_steps": 70000,  # 总的训练步数
	}
	return config



def main(
	data_dir,  # 数据集目录路径
	save_path,  # 模型保存路径
	batch_size,  # 批次大小
	n_workers,  # 数据加载工作线程数
	valid_steps,  # 验证频率（多少步验证一次）
	warmup_steps,  # 学习率预热步数
	total_steps,  # 总训练步数
	save_steps,  # 保存模型的频率（多少步保存一次）
):
	"""
	主函数，负责整个训练流程的控制。
	包括数据加载、模型初始化、训练循环、验证、模型保存等步骤。
	"""

	# 设置设备，优先使用GPU
	device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
	print(f"[Info]: Use {device} now!")

	# 加载训练和验证数据集
	train_loader, valid_loader, speaker_num = get_dataloader(data_dir, batch_size, n_workers)
	train_iterator = iter(train_loader)  # 创建训练数据迭代器
	print(f"[Info]: Finish loading data!", flush=True)

	# 初始化模型、损失函数、优化器和学习率调度器
	model = Classifier(n_spks=speaker_num).to(device)  # 创建分类器模型并放置到指定设备
	criterion = nn.CrossEntropyLoss()  # 交叉熵损失函数，适用于多分类问题
	optimizer = AdamW(model.parameters(), lr=1e-3)  # 使用AdamW优化器，学习率为1e-3
	scheduler = get_cosine_schedule_with_warmup(optimizer, warmup_steps, total_steps)  # 余弦退火学习率调度器
	print(f"[Info]: Finish creating model!", flush=True)

	# 初始化最佳准确率和最佳模型参数
	best_accuracy = -1.0
	best_state_dict = None

	# 使用tqdm创建训练进度条
	pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")

	# 主训练循环
	for step in range(total_steps):
		# 获取训练数据
		try:
			batch = next(train_iterator)
		except StopIteration:  # 当迭代器耗尽时重新初始化
			train_iterator = iter(train_loader)
			batch = next(train_iterator)

		# 前向传播、计算损失和准确率
		loss, accuracy = model_fn(batch, model, criterion, device)
		batch_loss = loss.item()
		batch_accuracy = accuracy.item()

		# 反向传播、优化模型参数、更新学习率
		loss.backward()
		optimizer.step()
		scheduler.step()
		optimizer.zero_grad()

		# 更新进度条信息
		pbar.update()
		pbar.set_postfix(loss=f"{batch_loss:.2f}", accuracy=f"{batch_accuracy:.2f}", step=step + 1)

		# 验证模型
		if (step + 1) % valid_steps == 0:
			pbar.close()  # 关闭当前进度条

			# 在验证集上评估模型
			valid_accuracy = valid(valid_loader, model, criterion, device)

			# 保存当前最佳模型
			if valid_accuracy > best_accuracy:
				best_accuracy = valid_accuracy
				best_state_dict = model.state_dict()

			# 重置训练进度条
			pbar = tqdm(total=valid_steps, ncols=0, desc="Train", unit=" step")

		# 保存模型
		if (step + 1) % save_steps == 0 and best_state_dict is not None:
			# 保存最佳模型参数到指定路径
			torch.save(best_state_dict, save_path)
			# 在进度条上记录信息
			pbar.write(f"Step {step + 1}, best model saved. (accuracy={best_accuracy:.4f})")

	pbar.close()  # 训练结束，关闭进度条



if __name__ == "__main__":
	"""
	允许用户通过修改`parse_args`函数中的配置来灵活控制训练流程，而不必直接硬编码这些参数到`main`函数的调用中。
	"""
	main(**parse_args())


# Inference

## Dataset of inference

In [None]:
import os
import json
import torch
from pathlib import Path
from torch.utils.data import Dataset


class InferenceDataset(Dataset):
	"""
	自定义的InferenceDataset类，用于推理阶段的数据加载。
	它继承自torch.utils.data.Dataset，需要实现`__init__`、`__len__`和`__getitem__`方法。
	"""

	def __init__(self, data_dir):
		"""
		初始化InferenceDataset类。

		参数:
		- data_dir: str，存储测试数据的目录，包含一个名为"testdata.json"的文件。
		"""
		testdata_path = Path(data_dir) / "testdata.json"  # 获取测试数据路径
		metadata = json.load(testdata_path.open())  # 读取并加载测试数据的JSON文件
		self.data_dir = data_dir  # 保存数据目录
		self.data = metadata["utterances"]  # 获取测试数据的“utterances”列表

	def __len__(self):
		"""
		返回InferenceDataset的长度，即数据集中元素的数量。
		"""
		return len(self.data)  # 返回“utterances”列表的长度

	def __getitem__(self, index):
		"""
		根据索引获取数据集中的一个元素。

		参数:
		- index: int，要获取的元素的索引。

		返回:
		- feat_path: str，对应测试数据的特征路径。
		- mel: torch.Tensor，加载的梅尔谱特征。
		"""
		utterance = self.data[index]  # 获取索引对应的utterance
		feat_path = utterance["feature_path"]  # 获取特征路径
		mel = torch.load(os.path.join(self.data_dir, feat_path))  # 加载梅尔谱特征

		return feat_path, mel  # 返回特征路径和梅尔谱特征


def inference_collate_batch(batch):
	"""
	将一批数据进行堆叠，用于推理阶段的批量处理。

	参数:
	- batch: 一个元组列表，每个元组包含一个特征路径和对应的梅尔谱特征。

	返回:
	- feat_paths: 列表，包含所有样本的特征路径。
	- mels: torch.Tensor，堆叠后的梅尔谱特征。
	"""
	feat_paths, mels = zip(*batch)  # 解压batch，将特征路径和梅尔谱分开
	return feat_paths, torch.stack(mels)  # 返回堆叠后的特征路径列表和梅尔谱张量


## Main funcrion of Inference

In [None]:
import json
import csv
from pathlib import Path
from tqdm.notebook import tqdm

import torch
from torch.utils.data import DataLoader

def parse_args():
	"""
	解析并返回配置参数，用于控制推理流程。

	返回:
	- config: 字典，包含以下键值对：
		- data_dir: 数据集目录路径。
		- model_path: 模型权重文件路径。
		- output_path: 输出结果CSV文件路径。
	"""
	config = {
		"data_dir": "./Dataset",  # 数据集目录
		"model_path": "./model.ckpt",  # 模型权重文件路径
		"output_path": "./output.csv",  # 输出结果CSV文件路径
	}
	return config


def main(
	data_dir,
	model_path,
	output_path,
):
	"""
	主函数，负责推理流程。

	参数:
	- data_dir: 数据集目录路径。
	- model_path: 模型权重文件路径。
	- output_path: 输出结果CSV文件路径。
	"""
	# 设置设备，优先使用GPU
	device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
	print(f"[Info]: Use {device} now!")

	# 读取映射文件，获取id到说话人的映射
	mapping_path = Path(data_dir) / "mapping.json"
	mapping = json.load(mapping_path.open())

	# 初始化推理数据集和数据加载器
	dataset = InferenceDataset(data_dir)
	dataloader = DataLoader(
		dataset,
		batch_size=1,  # 单个样本的批次大小
		shuffle=False,  # 不打乱数据顺序
		drop_last=False,  # 保留最后一个不足批次大小的样本
		num_workers=8,  # 数据加载的工作线程数
		collate_fn=inference_collate_batch,  # 自定义的批次合并函数
	)
	print(f"[Info]: Finish loading data!", flush=True)

	# 获取说话人数量
	speaker_num = len(mapping["id2speaker"])
	# 初始化模型并加载权重，设置为评估模式
	model = Classifier(n_spks=speaker_num).to(device)
	model.load_state_dict(torch.load(model_path))
	model.eval()
	print(f"[Info]: Finish creating model!", flush=True)

	# 初始化输出结果列表
	results = [["Id", "Category"]]  # CSV文件的列标题

	# 进行推理并收集结果
	for feat_paths, mels in tqdm(dataloader):
		# 在无梯度计算环境下运行模型
		with torch.no_grad():
			mels = mels.to(device)
			outs = model(mels)
			preds = outs.argmax(1).cpu().numpy()  # 获取预测的说话人ID
			for feat_path, pred in zip(feat_paths, preds):
				# 将预测结果转换为说话人名称并添加到结果列表
				results.append([feat_path, mapping["id2speaker"][str(pred)]])

	# 将结果写入CSV文件
	with open(output_path, 'w', newline='') as csvfile:
		writer = csv.writer(csvfile)
		writer.writerows(results)



if __name__ == "__main__":
	main(**parse_args())