In [52]:
# -*- coding: utf-8 -*-
# @Time    : 2021/12/12 20:09
# @Author  : XiuYuan Chen
# @File    : mfcc.ipync
import numpy as np
import torchaudio
import torch
import torch.nn as nn
import torch.nn.functional as F
import os
from torch.utils.data import DataLoader, Dataset
from sklearn.metrics import accuracy_score


class M5(nn.Module):
    def __init__(self, n_input=1, n_output=2, stride=16, n_channel=32):
        super().__init__()
        self.conv1 = nn.Conv1d(n_input, n_channel, kernel_size=5, stride=1, padding=2)
        self.bn1 = nn.BatchNorm1d(n_channel)
        self.pool1 = nn.MaxPool1d(2)
        self.conv2 = nn.Conv1d(n_channel, n_channel, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm1d(n_channel)
        self.pool2 = nn.MaxPool1d(2)
        self.conv3 = nn.Conv1d(n_channel, 2 * n_channel, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm1d(2 * n_channel)
        self.pool3 = nn.MaxPool1d(2)
        self.conv4 = nn.Conv1d(2 * n_channel, 2 * n_channel, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm1d(2 * n_channel)
        self.pool4 = nn.MaxPool1d(2)
        self.fc1 = nn.Linear(2 * n_channel, n_output)

    def forward(self, x):
        x = self.conv1(x)
        x = F.relu(self.bn1(x))
        x = self.pool1(x)
        x = self.conv2(x)
        x = F.relu(self.bn2(x))
        x = self.pool2(x)
        x = self.conv3(x)
        x = F.relu(self.bn3(x))
        x = self.pool3(x)
        x = self.conv4(x)
        x = F.relu(self.bn4(x))
        x = self.pool4(x)
        x = F.avg_pool1d(x, x.shape[-1])
        x = x.permute(0, 2, 1)
        x = torch.squeeze(x, dim=1)
        x = self.fc1(x)
        return F.log_softmax(x, dim=1)
class myloss(nn.Module):
    def __init__(self,):
        super().__init__()

    def forward(self, x, label):
        batch_num = x.shape[0]
        loss = -x[list(range(batch_num)), label.reshape(-1)].mean()
        return loss
class MyDataset(Dataset):
    def __init__(self, po_root, ne_root):
        self.pfilename_list = os.listdir(po_root)
        self.nfilename_list = os.listdir(ne_root)
        self.leng = max(len(self.nfilename_list),len(self.pfilename_list))
        self.po_root, self.ne_root = po_root, ne_root

    def __getitem__(self, index):
        pathlist,data, la = [], [], []
        if index< len(self.pfilename_list):
            pathlist.append((os.path.join(self.po_root, self.pfilename_list[index]), True))
        if index< len(self.nfilename_list):
            pathlist.append((os.path.join(self.ne_root, self.nfilename_list[index]), False))
        for path, flag in pathlist:
            waveform, sample_rate = torchaudio.load(
                path,
                frame_offset=0, num_frames=-1, normalize=True, channels_first=True)
            n_fft = 2048
            hop_length = 512
            n_mels = 128
            n_mfcc = 128

            mfcc_transform = torchaudio.transforms.MFCC(
                sample_rate=sample_rate,
                n_mfcc=n_mfcc,
                melkwargs={
                    'n_fft': n_fft,
                    'n_mels': n_mels,
                    'hop_length': hop_length,
                    'mel_scale': 'htk',
                }
            )
            mfcc = mfcc_transform(waveform[0:1]).permute(2, 0, 1)
            if flag:
                label = torch.ones(mfcc.shape[0], dtype=int)
            else:
                label = torch.zeros(mfcc.shape[0], dtype=int)
            data.append(mfcc)
            la.append(label)
        data, la = torch.cat(data, dim=0), torch.cat(la, dim=0)
        return data, la

    def __len__(self):
        return self.leng
def coll_fun(batch):
    data = torch.cat([item[0] for item in batch], dim=0)
    label = torch.cat([item[1] for item in batch], dim=0)
    return data, label


In [50]:
device = torch.device('cuda:0')
po_path,ne_path = './data/recorddata', './data/attackdata'
batch_size, num_epoch = 5, 10
model = M5(n_input=1, n_output=2).to(device)
optimizer, cri = torch.optim.Adam(model.parameters(), lr=1e-2), myloss()
train_dataset = MyDataset(po_path,ne_path)
train_loader = DataLoader(dataset=train_dataset, batch_size=batch_size, shuffle=True, collate_fn=coll_fun)
test_dataset = MyDataset('./data/test_p','./data/test_n')
test_loader = DataLoader(dataset=test_dataset, batch_size=batch_size, collate_fn=coll_fun)
for epoch in range(num_epoch):
    model.train()
    epoch_trainloss = []
    for i, (x_train, y_train) in enumerate(train_loader):
        x_train, y_train = x_train.to(device), y_train.to(device)
        ypred = model(x_train)
        optimizer.zero_grad()
        loss = cri(ypred, y_train)
        loss.backward()
        optimizer.step()
        epoch_trainloss.append(loss.item())
    print('epoch:',epoch,' average train loss:', np.array(epoch_trainloss).mean())
    model.eval()
    with torch.no_grad():
        for i, (x_test, y_test) in enumerate(test_loader):
            x_test, y_test = x_test.to(device), y_test.to(device)
            ypred = model(x_test)
            loss = cri(ypred, y_test)
            print('epoch:',epoch,' test_loss:', loss.item())
torch.save(model.state_dict(), 'parameter.pkl')


epoch: 0  average train loss: 0.15798454421261945
epoch: 0  test_loss: 0.1514187753200531
epoch: 1  average train loss: 0.05794688782447742
epoch: 1  test_loss: 0.07479961216449738
epoch: 2  average train loss: 0.06342331392483579
epoch: 2  test_loss: 0.07330065220594406
epoch: 3  average train loss: 0.04349425724811024
epoch: 3  test_loss: 0.017789103090763092
epoch: 4  average train loss: 0.037353785527456135
epoch: 4  test_loss: 0.010847906582057476
epoch: 5  average train loss: 0.04609273037769728
epoch: 5  test_loss: 0.00909013394266367
epoch: 6  average train loss: 0.036094438672686614
epoch: 6  test_loss: 0.008861272595822811
epoch: 7  average train loss: 0.04210061673074961
epoch: 7  test_loss: 0.022814705967903137
epoch: 8  average train loss: 0.03170357159494112
epoch: 8  test_loss: 0.008115837350487709
epoch: 9  average train loss: 0.035762881549696125
epoch: 9  test_loss: 0.005954447202384472
