In [2]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchaudio
import sys

import matplotlib.pyplot as plt
import IPython.display as ipd

from tqdm import tqdm

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [5]:
from torchaudio.datasets import SPEECHCOMMANDS
import os


class SubsetSC(SPEECHCOMMANDS):
    def __init__(self, subset: str = None):
        super().__init__("./", download=True)

        def load_list(filename):
            filepath = os.path.join(self._path, filename)
            with open(filepath) as fileobj:
                return [os.path.normpath(os.path.join(self._path, line.strip())) for line in fileobj]

        if subset == "validation":
            self._walker = load_list("validation_list.txt")
        elif subset == "testing":
            self._walker = load_list("testing_list.txt")
        elif subset == "training":
            excludes = load_list("validation_list.txt") + load_list("testing_list.txt")
            excludes = set(excludes)
            self._walker = [w for w in self._walker if w not in excludes]
            # 筛选代表数字的音频文件
        digits = ['zero', 'one', 'two', 'three', 'four', 'five', 'six', 'seven', 'eight', 'nine']
        self._walker = [w for w in self._walker if any(digit in w for digit in digits)]
        self.labels = []
    def collect_labels(self):
        # 收集所有唯一的标签
        for _, _, label, _, _ in self:
            if label not in self.labels:
                self.labels.append(label)
        self.labels.sort()

# # Create training and testing split of the data. We do not use validation in this tutorial.
# train_set = SubsetSC("training")
# test_set = SubsetSC("testing")

# waveform, sample_rate, label, speaker_id, utterance_number = train_set[0]

In [6]:
import torch
from torch.utils.data import DataLoader
import torchaudio.transforms as T
from torch.utils.data.dataset import random_split

sample_rate=16000
new_sample_rate = 8000
transform = torchaudio.transforms.Resample(orig_freq=sample_rate, new_freq=new_sample_rate)

from torch.nn.utils.rnn import pad_sequence

def collate_fn(batch):
    # 初始化特征和标签列表
    inputs, labels = [], []

    # 定义一个临时列表来存储所有波形的长度
    lengths = []

    # 提取波形和标签
    for waveform, _, label, _, _ in batch:
        lengths.append(waveform.size(1))
        inputs.append(waveform.squeeze(0)) 
        labels.append(label)

    # 找到最大的波形长度
    max_len = max(lengths)

    # Pad输入波形到相同的长度
    inputs_padded = pad_sequence(inputs, batch_first=True, padding_value=0)

    # 将标签转换为张量
    label_to_index = {label: index for index, label in enumerate(sorted(set(labels)))}
    labels_indices = torch.tensor([label_to_index[label] for label in labels])

    return inputs_padded, labels_indices

# all_data = SubsetSC(subset=None)  # 加载所有数据
# total_size = len(all_data)
# train_size = int(total_size * 0.8)
# test_size = total_size - train_size
# train_dataset, test_dataset = random_split(all_data, [train_size, test_size])

# train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
# test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)
# train_dataset, test_dataset = random_split(all_data, [train_size, test_size])

# 使用SubsetSC类和collate_fn来创建DataLoader
if device == "cuda":
    num_workers = 16
    pin_memory = True
else:
    num_workers = 0
    pin_memory = False
train_set = SubsetSC(subset='training')
train_loader = DataLoader(train_set, batch_size=64, shuffle=True, collate_fn=collate_fn, num_workers=num_workers, pin_memory=pin_memory)
val_set = SubsetSC(subset='validation')
val_loader = DataLoader(val_set, batch_size=64, shuffle=True, collate_fn=collate_fn, num_workers=num_workers, pin_memory=pin_memory)
test_set = SubsetSC(subset='testing')
test_loader = DataLoader(test_set, batch_size=64, shuffle=False, collate_fn=collate_fn, num_workers=num_workers, pin_memory=pin_memory)

train_set.collect_labels()
print("Training set labels:", train_set.labels)

Training set labels: ['eight', 'five', 'four', 'nine', 'one', 'seven', 'six', 'three', 'two', 'zero']


In [13]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class Perceptron(nn.Module):
    def __init__(self, input_size, num_classes):
        super(Perceptron, self).__init__()
        self.fc = nn.Linear(input_size, num_classes)
    
    def forward(self, x):
        out = self.fc(x)
        return out


In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F

class MLP(nn.Module):
    def __init__(self, input_size, hidden_size, num_classes):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, num_classes)
    
    def forward(self, x):
        out = self.fc1(x)
        out = self.relu(out)
        out = self.fc2(out)
        return out


In [17]:
# 参数设置
input_size =  16000
hidden_size = 64
num_classes = 10 
learning_rate = 0.001
num_epochs = 10
best_accuracy = 0.0
# 模型、损失函数和优化器
model = Perceptron(input_size, num_classes)
model.to(device)
# model = MLP(input_size, hidden_size, num_classes).to(device)
print(device)
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

for epoch in range(num_epochs):
    # 训练过程...
    model.train()  # 确保模型处于训练模式
    for i, (features, labels) in enumerate(train_loader):
        
        labels=labels.to(device)
        features = features.view(features.size(0), -1)
        features=features.to(device)
        outputs = model(features)
        loss = criterion(outputs, labels)

        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # 验证过程
    model.eval()  # 设置模型为评估模式
    correct = 0
    total = 0
    with torch.no_grad():
        for features, labels in val_loader:
            features=features.to(device)
            labels=labels.to(device)
            features = features.view(features.size(0), -1)
            outputs = model(features)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    
    accuracy = correct / total
    print(f'Epoch [{epoch+1}/{num_epochs}], Accuracy: {accuracy:.4f}')
    
    # 检查是否为最佳模型，并保存
    if accuracy > best_accuracy:
        print(f"Found better model at epoch {epoch+1} with accuracy {accuracy:.4f}. Saving model...")
        best_accuracy = accuracy
        torch.save(model.state_dict(), 'best_model_accuracy.pt')  # 保存最佳模型的权重


cuda


In [23]:
# 测试模型
model.load_state_dict(torch.load('best_model_accuracy.pt'))

model.eval()  # 设置模型为评估模式
with torch.no_grad():
    correct = 0
    total = 0
    for features, labels in test_loader:
        features=features.to(device)
        labels=labels.to(device)
        features = features.view(features.size(0), -1)
        outputs = model(features)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
    
    print(f'Accuracy of the model on the test set: {100 * correct / total} %')


Accuracy of the model on the test set: 10.372534696859022 %
