加载数据集。可以使用PyTorch中的Dataset和DataLoader来实现，并且可以使用torchaudio中的transforms来对语音信号进行处理，例如MFCC特征提取。

In [47]:
#测试
import torchaudio
waveform, sample_rate = torchaudio.load("data/dev-clean/LibriSpeech/dev-clean/84/121123/84-121123-0000.flac")

In [63]:
import torch
from torch.utils.data import Dataset, DataLoader
import torchaudio
import os
from torchaudio.transforms import MelSpectrogram, MFCC
from torch.nn.utils.rnn import pad_sequence

class LibriSpeechDataset(Dataset):
    def __init__(self, data_dir, train=True, max_audio_length=320000):
        super().__init__() 
        self.max_audio_length = max_audio_length
        if train:
            self.data_dir = os.path.join(data_dir, 'train-clean-100', 'LibriSpeech', 'train-clean-100')
        else:
            self.data_dir = os.path.join(data_dir, 'dev-clean', 'LibriSpeech', 'dev-clean')
        
        self.data_list = []
        fa_data_dirs = self.get_subfolders(self.data_dir)
        for item in fa_data_dirs:
            fa_dir_item = os.path.join(self.data_dir, item)
            ch_data_dir = self.get_subfolders(fa_dir_item)
            for it in ch_data_dir:
                ch_dir_item = os.path.join(fa_dir_item, it)
                data_list_path = self.get_txt_files(ch_dir_item)[0]
                with open(data_list_path, 'r') as f:
                    for line in f.readlines():
                        line = line.strip('\n').split(' ')
                        file_path = os.path.join(ch_dir_item, line[0]) + ".flac"
                        label = ' '.join(line[1:])
                        self.data_list.append([file_path, label])
        
        self.transforms = MFCC(sample_rate=16000, n_mfcc=26, melkwargs={
            'n_fft': 512,
            'hop_length': 160,
            'f_min': 20,
            'f_max': 8000
        })
        
        self.max_audio_length = max_audio_length

    # def __getitem__(self, index):
    #     file_path, label = self.data_list[index]
    #     waveform, sample_rate = torchaudio.load(file_path)
        
    #     if self.max_audio_length is not None:
    #         waveform = self.pad_audio(waveform, self.max_audio_length)
        
    #     mfcc_transformed = self.transforms(waveform)
    #     return mfcc_transformed, label
    def __getitem__(self, index):
        file_path, label = self.data_list[index]
        waveform, sample_rate = torchaudio.load(file_path)
        mfcc_transformed = self.transforms(waveform)

        # Perform padding
        if mfcc_transformed.size(1) < self.max_audio_length:
            padding = torch.zeros((1, self.max_audio_length - mfcc_transformed.size(1), mfcc_transformed.size(2)))
            mfcc_transformed = torch.cat([mfcc_transformed, padding], dim=1)

        return mfcc_transformed, label
    
    
    def __len__(self):
        return len(self.data_list)

    def get_subfolders(self, folder_path):
        subfolders = []
        for item in os.listdir(folder_path):
            item_path = os.path.join(folder_path, item)
            if os.path.isdir(item_path):
                subfolders.append(item)
        return subfolders
    
    def get_txt_files(self, folder_path):
        txt_files = []
        for root, dirs, files in os.walk(folder_path):
            for file in files:
                if file.endswith(".txt"):
                    file_path = os.path.join(root, file)
                    txt_files.append(file_path)
        return txt_files
    
    def pad_audio(self, waveform, max_length):
        current_length = waveform.size(1)
        if current_length < max_length:
            padding = torch.zeros(1, max_length - current_length)
            waveform = torch.cat([waveform, padding], dim=1)
        elif current_length > max_length:
            waveform = waveform[:, :max_length]
        return waveform

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
train_dataset = LibriSpeechDataset('data', train=True, max_audio_length=320000)
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=pad_sequence)

val_dataset = LibriSpeechDataset('data', train=False, max_audio_length=320000)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=pad_sequence)

# for batch in train_dataloader:
#     inputs, labels = batch
#     inputs = inputs.to(device)
#     labels = labels.to(device)


In [38]:
import os
import torch
import numpy as np
import torchaudio
from torch.utils.data import Dataset, DataLoader
from torchaudio.transforms import MelSpectrogram, MFCC

class LibriSpeechDataset(Dataset):
    def __init__(self, data_dir, train=True):
        super().__init__() 
        
        if train:
            self.data_dir = os.path.join(data_dir, 'train-clean-100','LibriSpeech','train-clean-100')
            # data_list_path = os.path.join(data_dir, 'train-clean-100', 'LibriSpeech','train-clean-100')
        else:
            self.data_dir = os.path.join(data_dir, 'dev-clean','LibriSpeech','dev-clean')
            # data_list_path = os.path.join(data_dir, 'dev-clean', 'LibriSpeech','dev-clean')
        
        self.data_list = []
        fa_data_dirs = self.get_subfolders(self.data_dir)
        for item in fa_data_dirs :
            fa_dir_item = os.path.join(self.data_dir,item) #19
            ch_data_dir = self.get_subfolders(fa_dir_item)
            for it in ch_data_dir:
                ch_dir_item = os.path.join(fa_dir_item,it) #198
                data_list_path = self.get_txt_files(ch_dir_item)[0]
                with open(data_list_path, 'r') as f:
                    for line in f.readlines():
                        line = line.strip('\n').split(' ')
                        # line[0] = line[0].replace("\\", ".flac")
                        file_path = os.path.join(ch_dir_item, line[0])
                        file_path = file_path + ".flac"
                        label = ' '.join(line[1:])
                        self.data_list.append([file_path, label])
        
        self.transforms = MFCC(sample_rate=16000, n_mfcc=26, melkwargs={
            'n_fft': 512,
            'hop_length': 160,
            'f_min': 20,
            'f_max': 8000
        })
    
    def __getitem__(self, index):
        file_path, label = self.data_list[index]
        waveform, sample_rate = torchaudio.load(file_path)
        mfcc_transformed = self.transforms(waveform)
        return mfcc_transformed, label
    
    def __len__(self):
        return len(self.data_list)

    def get_subfolders(self,folder_path):
        subfolders = []
        for item in os.listdir(folder_path):
            item_path = os.path.join(folder_path, item)
            if os.path.isdir(item_path):
                subfolders.append(item)
        return subfolders
    
    def get_txt_files(self,folder_path):
        txt_files = []
        for root, dirs, files in os.walk(folder_path):
            for file in files:
                if file.endswith(".txt"):
                    file_path = os.path.join(root, file)
                    txt_files.append(file_path)
        return txt_files
    
train_dataset = LibriSpeechDataset('data', train=True)
train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)

val_dataset = LibriSpeechDataset('data', train=False)
val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)
# print(train_dataset.data_list)

for batch in train_dataloader:
    inputs, labels = batch


RuntimeError: stack expects each tensor to be equal size, but got [1, 26, 1471] at entry 0 and [1, 26, 1515] at entry 1

定义模型。由于任务是语音识别，因此可以采用一种常见的架构：将语音信号输入一个由多个Transformer编码器组成的编码器，再将编码结果送入由多个Transformer解码器和一个全连接层组成的解码器，最终得到文本输出。

In [64]:
import torch.nn as nn
import torch.nn.functional as F

class EncoderLayer(nn.Module):
    def __init__(self, nhead, d_model, dim_feedforward, dropout=0.1):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
    
    def forward(self, x, mask=None):
        x2 = self.self_attn(x, x, x, key_padding_mask=mask)[0]
        x = x + self.dropout1(x2)
        x = self.norm1(x)
        x2 = F.relu(self.linear1(x))
        x2 = self.dropout2(x2)
        x2 = self.linear2(x2)
        x = x + self.dropout1(x2)
        x = self.norm2(x)
        return x

class Encoder(nn.Module):
    def __init__(self, num_layers, nhead, d_model, dim_feedforward, dropout=0.1):
        super().__init__()
        self.layers = nn.ModuleList([EncoderLayer(nhead, d_model, dim_feedforward, dropout) for _ in range(num_layers)])
        self.norm = nn.LayerNorm(d_model)
    
    def forward(self, x, mask=None):
        for layer in self.layers:
            x = layer(x, mask)
        return self.norm(x)
                      

定义解码器，由于解码器是逐步生成输出的，因此我们可以使用一个循环来生成输出，并在每个时间步上应用多头自注意力机制和全连接层。

In [65]:
class DecoderLayer(nn.Module):
    def __init__(self, nhead, d_model, dim_feedforward, dropout=0.1):
        super().__init__()
        self.self_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.multihead_attn = nn.MultiheadAttention(d_model, nhead, dropout=dropout)
        self.linear1 = nn.Linear(d_model, dim_feedforward)
        self.linear2 = nn.Linear(dim_feedforward, d_model)
        self.dropout1 = nn.Dropout(dropout)
        self.dropout2 = nn.Dropout(dropout)
        self.dropout3 = nn.Dropout(dropout)
        self.norm1 = nn.LayerNorm(d_model)
        self.norm2 = nn.LayerNorm(d_model)
        self.norm3 = nn.LayerNorm(d_model)
    
    
    def forward(self, x, memory, tgt_mask=None, memory_mask=None):
        x2 = self.self_attn(x, x, x, attn_mask=tgt_mask)[0]
        x = x + self.dropout1(x2)
        x = self.norm1(x)
        x2 = self.multihead_attn(x, memory, memory, attn_mask=memory_mask)[0]
        x = x + self.dropout2(x2)
        x = self.norm2(x)
        x2 = F.relu(self.linear1(x))
        x2 = self.dropout3(x2)
        x2 = self.linear2(x2)
        x = x + self.dropout1(x2)
        x = self.norm3(x)
        return x

class Decoder(nn.Module):
    def __init__(self, num_layers, nhead, d_model, dim_feedforward, output_dim, dropout=0.1):
        super().__init__()
        self.output_dim = output_dim
        self.layers = nn.ModuleList([DecoderLayer(nhead, d_model, dim_feedforward, dropout) for _ in range(num_layers)])
        self.out_proj = nn.Linear(d_model, output_dim)
        self.norm = nn.LayerNorm(d_model)
    
    def forward(self, x, memory, tgt_mask=None, memory_mask=None):
        for layer in self.layers:
            x = layer(x, memory, tgt_mask, memory_mask)
        x = self.norm(x)
        x = self.out_proj(x)
        return x

我们需要定义完整的模型，包括编码器和解码器，并将数据通过模型进行前向传递计算，并根据损失函数进行反向传播和优化。

In [66]:
class ASRModel(nn.Module):
    def __init__(self, num_layers, nhead, d_model, dim_feedforward, input_dim, output_dim, dropout=0.1):
        super().__init__()
        self.encoder = Encoder(num_layers, nhead, d_model, dim_feedforward, dropout)
        self.decoder = Decoder(num_layers, nhead, d_model, dim_feedforward, output_dim, dropout)
        self.input_proj = nn.Linear(input_dim, d_model)
        self.out = nn.Linear(d_model, output_dim)
    
    def forward(self, src, tgt_input, src_mask=None, tgt_mask=None, memory_mask=None):
        src = self.input_proj(src)
        memory = self.encoder(src, src_mask)
        tgt_input = nn.functional.pad(tgt_input[:, :-1], (1, 0), value=self.decoder.output_dim-1)
        out = self.decoder(tgt_input, memory, tgt_mask=tgt_mask, memory_mask=memory_mask)
        return out
    
    def generate(self, src, src_mask=None, max_len=100, start_symbol=0):
        src = self.input_proj(src)
        memory = self.encoder(src, src_mask)
        ys = torch.ones(1, 1).fill_(start_symbol).type_as(src.data)
        
        for i in range(max_len-1):
            out = self.decoder(ys, memory, tgt_mask=self.subsequent_mask(ys.size(1)).type_as(src.data), memory_mask=None)
            prob = self.out(out[:, -1])
            _, next_word = torch.max(prob, dim=1)
            next_word = next_word.data[0]
            ys = torch.cat([ys, torch.ones(1, 1).type_as(src.data).fill_(next_word)], dim=1)
            if next_word == eos_id:
                break
        return ys
    
    def subsequent_mask(self, size):
        "Mask out subsequent positions."
        attn_shape = (1, size, size)
        subsequent_mask = np.triu(np.ones(attn_shape), k=1).astype('uint8')
        return torch.from_numpy(subsequent_mask) == 0

定义训练和评估过程。
在训练过程中，我们需要计算模型在训练集上的损失，并根据反向传播和优化器进行更新。我们可以使用交叉熵损失来计算模型的输出与目标文本之间的差距。

In [83]:
def train_epoch(model, train_dataloader, optimizer, criterion, device):
    model.train()
    train_loss = 0.0
    num_batches = 0

    for batch_idx, (src, tgt) in enumerate(train_dataloader):
        
        src = src.to(device)
        tgt = tgt.to(device)

        tgt_input = tgt[:, :-1]
        tgt_target = tgt[:, 1:].contiguous().view(-1)

        optimizer.zero_grad()

        output = model(src, tgt_input)
        output = output.view(-1, output.shape[-1])

        loss = criterion(output, tgt_target)
        train_loss += loss.item()

        loss.backward()
        optimizer.step()

        num_batches += 1
        src = None
        tgt = None
        tgt_input = None
        tgt_target = None
        output = None
        loss = None
        

    train_loss /= num_batches
    return train_loss

def eval_epoch(model, val_dataloader, criterion, device):
    model.eval()
    val_loss = 0.0
    num_batches = 0

    with torch.no_grad():
        for batch_idx, (src, tgt) in enumerate(val_dataloader):

            src = src.to(device)
            tgt = tgt.to(device)

            tgt_input = tgt[:, :-1]
            tgt_target = tgt[:, 1:].contiguous().view(-1)

            output = model(src, tgt_input)
            output = output.view(-1, output.shape[-1])

            loss = criterion(output, tgt_target)
            val_loss += loss.item()

            num_batches += 1
            
            src = None
            tgt = None
            tgt_input = None
            tgt_target = None
            output = None
            loss = None

    val_loss /= num_batches
    return val_loss

定义函数来训练模型。

In [84]:
def train(model, train_dataloader, val_dataloader, optimizer, criterion, device, epochs=10, early_stop_patience=5, model_save_path=None):
    best_val_loss = float('inf')
    best_val_epoch = 0
    stop_count = 0

    for epoch in range(epochs):
        train_loss = train_epoch(model, train_dataloader, optimizer, criterion, device)
        val_loss = eval_epoch(model, val_dataloader, criterion, device)

        print(f'Epoch {epoch+1}, train_loss: {train_loss:.3f}, val_loss: {val_loss:.3f}')

        if val_loss < best_val_loss:
            if model_save_path is not None:
                 torch.save(model.state_dict(), model_save_path)
            best_val_loss = val_loss
            best_val_epoch = epoch
            stop_count = 0
        else:
            stop_count += 1
            if stop_count >= early_stop_patience:
                print(f'Early stopping at epoch {epoch+1}')
                break

    print(f'Best validation loss: {best_val_loss:.3f}, at epoch {best_val_epoch+1}')

最后，我们可以调用这些方法来训练模型。

In [85]:
if __name__ == '__main__':
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

    # train_dataset = LibriSpeechDataset('data', train=True)
    # train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)

    # val_dataset = LibriSpeechDataset('data', train=False)
    # val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False)
    train_dataset = LibriSpeechDataset('data', train=True, max_audio_length=320000)#采样率是16000，视频最长给定20秒
    train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=pad_sequence)
    val_dataset = LibriSpeechDataset('data', train=False, max_audio_length=320000)
    val_dataloader = DataLoader(val_dataset, batch_size=32, shuffle=False, collate_fn=pad_sequence)
    
    model = ASRModel(num_layers=6, nhead=8, d_model=512, dim_feedforward=2048, input_dim=26, output_dim=28, dropout=0.1)
    model.to(device)

    optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
    criterion = nn.CrossEntropyLoss(ignore_index=-1)

    train(model, train_dataloader, val_dataloader, optimizer, criterion, device, epochs=10, early_stop_patience=5)

RuntimeError: [enforce fail at ..\c10\core\CPUAllocator.cpp:73] data. DefaultCPUAllocator: not enough memory: you tried to allocate 1969920000 bytes. Buy new RAM!

从网站https://openslr.org/12/下载并解压测试数据集，然后将数据转换为模型可以识别的格式。可以使用类似以下代码进行MFCC特征提取：

In [None]:
import librosa

def extract_features(file_path):
    y, sr = librosa.load(file_path, sr=16000)
    mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=26, hop_length=256, n_fft=512)
    return mfccs.T

使用上述代码可以将音频文件转换为26维的MFCC特征。请注意，需要保留音频文件的完整路径以便后续训练；也可以在测试时读取音频文件以进行预测。
接下来，使用已经训练好的模型加载数据，这里直接使用之前写好的ASRModel。以下是一个完整的测试代码示例：

In [None]:
import torch
from torch.utils.data import DataLoader
# from data import LibriSpeechDataset
# from model import ASRModel
def get_all_flac_files(folder_path):
    file_paths = []
    for root, dirs, files in os.walk(folder_path):
        for file in files:
            if file.endswith('.flac'):
                file_path = os.path.join(root, file)
                file_paths.append(file_path)
    return file_paths
def test(model_path, rootpath):
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
    dataset_path = get_all_flac_files(rootpath)
    print(dataset_path)
    # 加载测试集
    test_dataset = LibriSpeechDataset(dataset_path, train=False, transform=extract_features)
    test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)
    
    # 加载模型
    model = ASRModel(num_layers=6, nhead=8, d_model=512, dim_feedforward=2048, input_dim=26, output_dim=28, dropout=0.1)
    model.load_state_dict(torch.load(model_path))
    model.to(device)
    model.eval()

    # 测试的主循环
    with torch.no_grad():
        for batch_idx, (src, _) in enumerate(test_dataloader):
            src = src.to(device)
            
            # 生成文本结果
            output = model.generate(src, src_mask=None, max_len=100, start_symbol=0)
            print(f"Batch {batch_idx+1}, predicted transcript: {output}")


if __name__ == '__main__':
    model_path = '/path/to/model/ASRModel.pth'
    dataset_path = 'data/test-clean/LibriSpeech'
    test(model_path, dataset_path)