In [2]:
import torch
from torch import nn, optim
from torch.nn import Conv2d, Linear, Sequential, Flatten
from torch.utils.data import Dataset, DataLoader
from tqdm import tqdm
import random
from glob import glob
import numpy as np
from functools import partial
from python_speech_features import mfcc, delta # 导入音频特征提取工具包
import scipy.io.wavfile as wav




In [3]:
#数据预处理
data_path = 'FSDD'
waves = glob("{}/*.wav".format(data_path),recursive=True)
print("总数据数量： ",len(waves))



总数据数量：  3000


In [4]:

    #打乱列表
random.shuffle(waves)
    #划分数据集 train 0.9 test/val 0.05
train_waves = waves[:int(len(waves)*0.9)]
val_waves = waves[int(len(waves)*0.9):int(len(waves)*0.95)]
test_waves = waves[int(len(waves)*0.95):]
print("训练集数目:\t",len(train_waves),"\n验证集数目:\t",len(val_waves),"\n测试集数目:\t",len(test_waves))

训练集数目:	 2700 
验证集数目:	 150 
测试集数目:	 150


In [5]:

#MFCC特征提取
def get_mfcc(data, fs):
    # MFCC特征提取
    wav_feature = mfcc(data, fs)

    # 特征一阶差分
    d_mfcc_feat = delta(wav_feature, 1)
    # 特征二阶差分
    d_mfcc_feat2 = delta(wav_feature, 2)
    # 特征拼接
    feature = np.concatenate(
        [wav_feature.reshape(1, -1, 13), d_mfcc_feat.reshape(1, -1, 13), d_mfcc_feat2.reshape(1, -1, 13)], 0)

    # 对数据进行截取或者填充
    if feature.shape[1] > 64:
        feature = feature[:, :64, :]
    else:
        feature = np.pad(feature, ((0, 0), (0, 64 - feature.shape[1]), (0, 0)), 'constant')
    # 通道转置(HWC->CHW)
    feature = feature.transpose((2, 0, 1))

    return feature

In [6]:

# 读取音频样例
fs, signal = wav.read('FSDD/0_george_0.wav')
# 特征提取
feature = get_mfcc(signal, fs)
print('特征形状(CHW):', feature.shape, type(feature))

特征形状(CHW): (13, 3, 64) <class 'numpy.ndarray'>


In [7]:
#标签提取
def preproess(waves):
    datalist=[]
    lablelist=[]
    for w in tqdm(waves):
        lablelist.append([int(w[5])])
        fs, signal = wav.read(w)
        f = get_mfcc(signal, fs)
        datalist.append(f)
    return np.array(datalist),np.array(lablelist)

train_data,train_lable=preproess(train_waves)
val_data,val_lable=preproess(val_waves)
test_data,test_lable=preproess(test_waves)

100%|██████████| 2700/2700 [00:05<00:00, 497.83it/s]
100%|██████████| 150/150 [00:00<00:00, 508.48it/s]
100%|██████████| 150/150 [00:00<00:00, 541.52it/s]


In [9]:
for data in train_lable:
    print(train_lable) 
    print(train_lable.shape)
    break

[[5]
 [4]
 [8]
 ...
 [3]
 [0]
 [7]]
(2700, 1)


In [11]:
#组装数据集
class MyDataset(torch.utils.data.Dataset):
    def __init__(self,audio,text):
        super(MyDataset, self).__init__()
        self.text = text
        self.audio = audio

    def __getitem__(self, index):
        return self.audio[index],self.text[index]

    def __len__(self):
        return self.audio.shape[0]

In [12]:

train_dataset = MyDataset(train_data,train_lable)
train_loader = DataLoader(train_dataset, batch_size=64, shuffle=True,drop_last=True)

test_dataset = MyDataset(test_data,test_lable)
test_loader = DataLoader(test_dataset, batch_size=64, shuffle=True,drop_last=True)

val_dataset = MyDataset(val_data,val_lable)
val_loader = DataLoader(val_dataset, batch_size=64, shuffle=True,drop_last=True)

In [19]:
#打印一批数据信息看看
for data in train_loader:
    audio, text = data
    text = text.view(-1)
    print(audio.shape)
    # text = text.unsqueeze(1)
    print(text)
    print(text.shape)
    break





torch.Size([64, 13, 3, 64])
tensor([8, 5, 6, 8, 9, 2, 5, 0, 4, 3, 9, 0, 7, 2, 4, 7, 9, 6, 3, 1, 9, 2, 9, 0,
        3, 1, 5, 0, 7, 9, 9, 5, 4, 9, 1, 1, 1, 1, 5, 5, 1, 6, 3, 3, 8, 5, 5, 1,
        1, 1, 1, 7, 8, 9, 9, 7, 0, 7, 7, 8, 3, 9, 1, 2], dtype=torch.int32)
torch.Size([64])


In [20]:
#组装CNN网络
class Mynet(nn.Module):
    def __init__(self):
        super(Mynet,self).__init__()

        self.module1 = Sequential(
                Conv2d(in_channels=13, out_channels=16, kernel_size=(3,3), stride=1, padding=1),
                Conv2d(in_channels=16, out_channels=16, kernel_size=(3,2), stride=(1,2), padding=(1,0)),
                Conv2d(in_channels=16, out_channels=32, kernel_size=3, stride=1, padding=1),
                Conv2d(in_channels=32, out_channels=32, kernel_size=(3,2), stride=(1,2), padding=(1,0)),
                Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1, padding=1),
                Conv2d(in_channels=64, out_channels=64, kernel_size=(3,2), stride=2, padding=0),
                Flatten(),
                Linear(in_features=512, out_features=128),
                Linear(in_features=128, out_features=10)
        )

    def forward(self,x):
        x = self.module1(x)
        return x



In [21]:
mynet = Mynet()
print(mynet)

Mynet(
  (module1): Sequential(
    (0): Conv2d(13, 16, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): Conv2d(16, 16, kernel_size=(3, 2), stride=(1, 2), padding=(1, 0))
    (2): Conv2d(16, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): Conv2d(32, 32, kernel_size=(3, 2), stride=(1, 2), padding=(1, 0))
    (4): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (5): Conv2d(64, 64, kernel_size=(3, 2), stride=(2, 2))
    (6): Flatten(start_dim=1, end_dim=-1)
    (7): Linear(in_features=512, out_features=128, bias=True)
    (8): Linear(in_features=128, out_features=10, bias=True)
  )
)


In [28]:
input = torch.ones((64,13,3,64))
output = mynet(input)
print(output.shape)

torch.Size([64, 10])


In [23]:
#模型训练
epochs = 20

model = Mynet()

#记录训练次数
total_train_step = 0
total_test_step = 0
#学习速率
lr_rate = 1e-2

# 定义损失函数和优化器 
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=lr_rate)

In [26]:
for data in train_loader:
    data, lable = data
    
    print('imgs.shape:  ',data.shape)
    print('targets:',lable)
    print(lable.shape)
    break

imgs.shape:   torch.Size([64, 13, 3, 64])
targets: tensor([[5],
        [5],
        [9],
        [5],
        [8],
        [7],
        [3],
        [2],
        [2],
        [3],
        [0],
        [6],
        [2],
        [3],
        [2],
        [9],
        [1],
        [6],
        [3],
        [9],
        [6],
        [7],
        [8],
        [9],
        [2],
        [0],
        [6],
        [2],
        [3],
        [4],
        [3],
        [0],
        [6],
        [4],
        [6],
        [3],
        [2],
        [9],
        [3],
        [4],
        [7],
        [2],
        [9],
        [0],
        [4],
        [0],
        [4],
        [0],
        [9],
        [0],
        [6],
        [7],
        [2],
        [0],
        [9],
        [2],
        [0],
        [6],
        [8],
        [2],
        [2],
        [9],
        [1],
        [9]], dtype=torch.int32)
torch.Size([64, 1])


In [24]:
# 训练模型
for i in range(epochs):
    print("------------第{}轮训练开始----------".format(i+1))
    
    #训练步骤开始
    model.train()
    for data in train_loader:
        audio, text = data
        audio = audio.float()
        text = text.view(-1)
        text = text.float()
        

        output = model(audio)
        
        loss = loss_fn(audio,text)

        #反向传播 调优
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_train_step += 1
        print("训练次数:{},Loss:{}".format(total_train_step,loss.item()))

    #测试步骤开始
    model.eval()
    total_test_loss = 0
    total_accuracy = 0
    with torch.no_grad():
        for data in test_loader:
            audio, text = data
            audio = audio.float()
            text = text.float()
            text = text.argmax(dim=1)
        
            output = model(audio)
            loss = loss_fn(output,text)
            total_test_loss += loss.item()

            accuray = (output.argmax(1) == text).sum()
            total_accuracy += accuray

    print("测试集上的Loss:{}".format(total_test_loss))
    print("测试集上的Accuracy:{}".format(total_accuracy/total_test_step))
    total_test_step += 1

------------第1轮训练开始----------


RuntimeError: only batches of spatial targets supported (3D tensors) but got targets of dimension: 1