## 导入包

In [None]:
import numpy as np
import pandas as pd

import torch
import torch.nn as nn
from torch.utils.data import Dataset
from sklearn.model_selection import train_test_split
# from autosklearn.classification import AutoSklearnClassifier
import torch.utils.data as data
import torch.nn as nn
import matplotlib.pyplot as plt
import time

In [None]:
def load_data_11_39(BATCH_SIZE = 1024):
    train = np.load('../input/ml2021spring-hw2/timit_11/timit_11/train_11.npy')
    label = np.load('../input/ml2021spring-hw2/timit_11/timit_11/train_label_11.npy')
    test = np.load('../input/ml2021spring-hw2/timit_11/timit_11/test_11.npy')

    train = train.reshape(-1, 11, 39)  # LSTM用
    test = test.reshape(-1, 11, 39)

    print('Size of training data: {}'.format(train.shape))
    print('Size of testing data: {}'.format(test.shape))
    # 切割数据
    print(type(train), type(label[0]))  # numpy -> tensor, y原本是str,转成int
    train = torch.from_numpy(train).float()
    label = label.astype(np.int)
    label = torch.LongTensor(label)
    test = torch.from_numpy(test).float()

    VAL_RATIO = 0.1

    percent = int(train.shape[0] * (1 - VAL_RATIO))
    train_x, train_y, dev_x, dev_y = train[:percent], label[:percent], train[percent:], label[percent:]
    # print(train_x[0])     # 大小没有明显范围,有正有负
    print('Size of training set: {}'.format(train_x.shape))
    print('Size of dev set: {}'.format(dev_x.shape))

    # 随机切效果不如直接切,输入是语音,如果随机切两个集就变成同分布了
    # train_x, dev_x, train_y, dev_y = train_test_split(train, label, test_size=0.2, random_state=14138)

    train_set = TIMITDataset(train_x, train_y)
    dev_set = TIMITDataset(dev_x, dev_y)
    test_set = TIMITDataset(test, None)
    train_loader = data.DataLoader(dataset=train_set, batch_size=BATCH_SIZE, shuffle=True)
    dev_loader = data.DataLoader(dataset=dev_set, batch_size=BATCH_SIZE, shuffle=False)  # dev集太大了也要分批送进去
    test_loader = data.DataLoader(dataset=test_set, batch_size=BATCH_SIZE, shuffle=False)
    return train_loader, dev_loader, test_loader, train_x, dev_x

In [None]:
class TIMITDataset(Dataset):
    def __init__(self, sounds, labels):
        self.sounds = sounds
        self.labels = labels

    def __getitem__(self, idx):
        if self.labels == None:         # 在测试集上输出
            return self.sounds[idx]
        return self.sounds[idx], self.labels[idx]

    def __len__(self):
        return len(self.sounds)


## 网络结构

In [None]:
class FNN(nn.Module):
    def __init__(self, n_hidden):
        super(FNN, self).__init__()
        self.layer1 = torch.nn.Sequential(
            nn.BatchNorm1d(num_features=429, momentum=0.1),
            nn.Linear(429, n_hidden),
            nn.Dropout(0.5),
            nn.ReLU(),
        )
        self.layer2 = torch.nn.Sequential(
            nn.BatchNorm1d(num_features=n_hidden, momentum=0.1),
            nn.Linear(n_hidden, n_hidden),
            nn.Dropout(0.5),
            nn.ReLU(),
        )
        self.layer3 = torch.nn.Sequential(
            nn.BatchNorm1d(num_features=n_hidden, momentum=0.1),
            nn.Linear(n_hidden, n_hidden),
            nn.Dropout(0.5),
            nn.ReLU(),
        )
        self.out = nn.Linear(n_hidden, 39)

    def forward(self, x):
        x = self.layer1(x)
        x = self.layer2(x)
        x = self.layer3(x)
        x = self.out(x)
        return x

In [None]:
class LSTM(nn.Module):
    def __init__(self, n_hidden):
        super(LSTM, self).__init__()
        self.layer1 = torch.nn.Sequential(
            nn.LSTM(input_size=39, hidden_size=n_hidden, num_layers=1, batch_first=True),
        )
        self.layer1_1 = torch.nn.Sequential(
            nn.Dropout(0.5),
            nn.ReLU(),
        )
        self.layer2 = torch.nn.Sequential(
            nn.LSTM(input_size=n_hidden, hidden_size=n_hidden, num_layers=1, batch_first=True),
        )
        self.layer2_1 = torch.nn.Sequential(
            nn.Dropout(0.5),
            nn.ReLU(),
        )
        self.layer3 = torch.nn.Sequential(
            nn.LSTM(input_size=n_hidden, hidden_size=n_hidden, num_layers=1, batch_first=True),
        )
        self.layer3_1 = torch.nn.Sequential(
            nn.Dropout(0.5),
            nn.ReLU(),
        )
        self.out = nn.Linear(n_hidden, 39)


    def forward(self, x):
        x, _ = self.layer1(x)
        x = self.layer1_1(x)
        x, _ = self.layer2(x)
        x = self.layer2_1(x)
        x, _ = self.layer3(x)
        x = self.layer3_1(x)
        x = x[:, -1, :]
        x = self.out(x)
        return x

In [None]:
def get_device():
  return 'cuda' if torch.cuda.is_available() else 'cpu'
print('硬件: ', get_device())

## NN训练

In [None]:
def Nerual_Network():
    time_start = time.time()
    BATCH_SIZE = 2048
    LR = 0.001
    EPOCH = 3
    N_HIDDEN = 2048

    train_loader, dev_loader, test_loader, train_x, dev_x = load_data_11_39(BATCH_SIZE)

    model = LSTM(N_HIDDEN).cuda()
    optimizer = torch.optim.Adam(model.parameters(), lr=LR, weight_decay=1e-4)
    loss_func = nn.CrossEntropyLoss()       # 自带softmax

    Train_Loss = []         # 总的误差曲线
    Dev_Loss = []
    Train_Acc = []
    Dev_Acc = []

    for e in range(EPOCH):
        train_loss, train_acc = 0, 0
        dev_loss, dev_acc = 0, 0


        model.train()       # model.train(): 启用BN和drop out, model.eval(): 沿用BN的值，并不使用drop out
        for idx, (b_x, b_y) in enumerate(train_loader):
            optimizer.zero_grad()
            output = model(b_x.cuda())
            loss = loss_func(output, b_y.cuda())
            loss.backward()
            optimizer.step()

            train_loss += loss
            train_pred = torch.max(output, 1)[1].cpu()
            train_acc += float((train_pred == b_y).sum().item())      # 预测正确的总数

        # dev集验证
        model.eval()
        with torch.no_grad():
            for idx, (b_x, b_y) in enumerate(dev_loader):
                output = model(b_x.cuda())
                loss = loss_func(output, b_y.cuda())

                dev_loss += loss
                dev_pred = torch.max(output, 1)[1].cpu()
                dev_acc += float((dev_pred == b_y).sum().item())      # 预测正确的总数


        print('Epoch: ', e,
              '| train loss: %.4f' % (train_loss.cpu().data.numpy() / len(train_loader)),
              '| train accuracy: %.4f' % (train_acc / len(train_x)),
              '| dev loss: %.4f' % (dev_loss.cpu().data.numpy() / len(dev_loader)),
              '| dev accuracy: %.4f' % (dev_acc / len(dev_x)),
              )
        Train_Loss.append(train_loss.cpu().data.numpy() / len(train_loader))
        Train_Acc.append(train_acc / len(train_x))
        Dev_Loss.append(dev_loss.cpu().data.numpy() / len(dev_loader))
        Dev_Acc.append(dev_acc / len(dev_x))
    torch.save(model, '3L_2048_LSTM.pkl')
    plt.plot(Train_Loss)
    plt.plot(Dev_Loss)
    plt.title('Loss')
    plt.legend(['Training set', 'Dev_set'])
    plt.show()

    plt.plot(Train_Acc)
    plt.plot(Dev_Acc)
    plt.title('Accuracy')
    plt.legend(['Training set', 'Dev_set'])
    plt.show()

    # 输出预测结果
    predict = []
    model.eval()
    with torch.no_grad():
        for i, data in enumerate(test_loader):
            inputs = data
            inputs = inputs.cuda()
            outputs = model(inputs)
            test_pred = torch.max(outputs, 1)[1].cpu().data.numpy()

            for y in test_pred:
                predict.append(y)

    res = pd.DataFrame({'Id':[i for i in range(len(predict))], 'Class':predict})
    res.to_csv('sub_NN.csv', index=False)
    print("NN模型输出完毕!")
    time_end = time.time()
    print("NN训练总时长: %ds" %(time_end - time_start))
    return model
Nerual_Network()

In [None]:
time.sleep(50000)