In [None]:
import torch.nn as nn


In [None]:
class VanillaRNN(nn.Module):
    def __init__(self):
        super(VanillaRNN, self).__init__()
        # 每个网络层后面跟的都是数据的shape
        self.rnn1 = nn.RNN(input_size=58, hidden_size=64, batch_first=True) # batchsize, length, H_in
        self.dense = nn.Linear(61 * 64, 1)  # batchsize, length * H_out
        self.sigmoid = nn.Sigmoid() # batchsize, 1

    def forward(self, x):
        # 原始数据读进来是[data samples, H_in, length]，需要转换维度到[data samples, length, H_in]
        x = x.permute(0, 2, 1)
        x, _ = self.rnn1(x)
        x = torch.flatten(x, 1, -1)
        x = self.dense(x)
        x = self.sigmoid(x)
        return x

class LSTMModel(nn.Module):
    def __init__(self):
        super(LSTMModel, self).__init__()

        self.rnn1 = nn.LSTM(input_size=58, hidden_size=128, dropout=0.2, batch_first=True) # batchsize, length, H_in
        self.rnn2 = nn.LSTM(input_size=128, hidden_size=128, dropout=0.2, batch_first=True)
        self.pooling = nn.AdaptiveAvgPool1d(1) # batchsize, length
        self.dense = nn.Linear(61, 1)
        self.sigmoid = nn.Sigmoid() # batchsize, 1

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x, _ = self.rnn1(x)
        x, _ = self.rnn2(x)
        x = self.pooling(x)
        x = x[:, :, 0]
        x = self.dense(x)
        x = self.sigmoid(x)
        return x

class BiLSTMModel(nn.Module):
    def __init__(self):
        super(BiLSTMModel, self).__init__()

        self.rnn1 = nn.LSTM(input_size=58, hidden_size=128, dropout=0.2, batch_first=True, bidirectional=True)# batchsize, length, H_in
        self.rnn2 = nn.LSTM(input_size=128 * 2, hidden_size=128, dropout=0.2, batch_first=True, bidirectional=True)# batchsize, length, 2 * H_out
        self.pooling = nn.AdaptiveAvgPool1d(1) # batchsize, length
        self.dense = nn.Linear(61, 1) # batchsize, 1
        self.sigmoid = nn.Sigmoid()

    def forward(self, x):
        x = x.permute(0, 2, 1)
        x, _ = self.rnn1(x)
        x, _ = self.rnn2(x)
        x = self.pooling(x)
        x = torch.flatten(x, 1, -1)
        x = self.dense(x)
        x = self.sigmoid(x)
        return x

In [None]:
import numpy as np
import torch
from torch.utils.data import DataLoader, random_split, TensorDataset

In [None]:
def load_data():
    X, Y = [], []
    data = np.load('./normal_data_processed.pkl', allow_pickle=True)
    print(data.shape)
    for item in data: # 遍历normal_data_processed里面的数据，整理成numpy array格式的数据，方便处理
        if item.shape != (58, 61) or np.sum(np.isnan(item)):
            continue
        X.append(np.array(item, dtype=np.float32))

    Y.extend(np.zeros(len(X))) # 正常数据对应的label为0

    data1 = np.load('./earthquakes_data_processed.pkl', allow_pickle=True)

    for item in data1: # 遍历地震的数据
        if item.shape != (58, 61) or np.sum(np.isnan(item)):
            continue
        X.append(np.array(item, dtype=np.float32))
    Y.extend(np.ones(len(X) - len(Y))) # 正常数据对应的label为1
    X = np.array(X, dtype=np.float32)
    Y = np.array(Y, dtype=np.float32)
    # 因为X是一整块正常的数据和一整块地震数据，这里做shuffle，把X和Y以相同的顺序打乱
    shuffle_ix = np.random.permutation(np.arange(len(X)))
    data = X[shuffle_ix]
    label = Y[shuffle_ix]
    return data, label

# 生成dataloader，dataloader是torch框架中数据管道的类，用于快速遍历数据
def get_dataloader(test=False):
    # 固定随机种子，保证每次数据分割是一致的
    torch.manual_seed(123)
    torch.random.manual_seed(123)

    x, y = load_data()
    dataset = TensorDataset(torch.tensor(x), torch.tensor(y))# 得到数据集

    train_dataset, valid_dataset, test_dataset = random_split(dataset, [int(0.7 * len(x)), int(0.2 * len(x)),
                                                                        len(x) - int(0.7 * len(x)) - int(0.2 * len(x))])
    if test: # 测试的话，意味着有训练好的模型，只需要在测试数据上测试一下即可
        test_dataloader = DataLoader(test_dataset, batch_size=1, num_workers=4, shuffle=False) # 制作
        return test_dataloader
    else: # 训练的话，需要训练计和验证集
        # 转换成dataloader
        train_dataloader = DataLoader(train_dataset, batch_size=128, num_workers=4, shuffle=True)
        valid_dataloader = DataLoader(valid_dataset, batch_size=128, num_workers=4, shuffle=False)
        test_dataloader = DataLoader(test_dataset, batch_size=128, num_workers=4, shuffle=False)
        return train_dataloader, valid_dataloader, test_dataloader

In [None]:
import torch
import os
import matplotlib.pyplot as plt
from tqdm import tqdm

from sklearn.metrics import classification_report, accuracy_score

In [None]:
epochs = 100
lr = 1e-3
model_dict = {
    'VanillaRNN': VanillaRNN(),
    'LSTM': LSTMModel(),
    'BiLSTM': BiLSTMModel(),
}
model_name = 'LSTM'

In [None]:
# 训练过程
def training():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 确认执行设备
    model = model_dict[model_name]
    train_dataloader, valid_dataloader, test_dataloader = get_dataloader(False) # 得到数据的迭代器，这里训练用train_dataloader
    # 验证用valid_dataloader 测试用test_dataloader
    loss_func = torch.nn.BCELoss().to(device) # loss计算函数
    optimizer = torch.optim.Adam(model.parameters(), lr=lr) # 模型参数优化器
    lr_scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.99) # 学习率调整器

    best_loss = 100000 # 用于判断验证集loss最小的参数
    history = {'train loss': [], 'test loss': [], 'train acc': [], 'test acc': []} # 保存历史loss
    if not os.path.exists('./checkpoints'): # 创建模型保存文件的文件夹
        os.mkdir('./checkpoints')
    for epoch in range(epochs):
        mean_train_loss = [] # 保存这个epoch中的训练集loss，用于绘图
        mean_test_loss = []# 保存历史的验证集loss，用于绘图

        labels = []
        preds = []
        model.train() # model切换为train模式
        for item in tqdm(train_dataloader): # 遍历数据集，获取Sample
            optimizer.zero_grad() # 清除优化器的梯度，这是每次更新参数必需的操作
            value, label = item[0], item[1] # 获取数据集sample中的value和label
            output = model(value.to(device)) # 得到输出

            loss = loss_func(output.squeeze(), label.to(device)) # 由 output和label计算loss
            loss.backward() # 由loss进行BP得到梯度
            optimizer.step() # 优化器更新参数
            mean_train_loss.append(loss.detach().cpu().numpy()) # 把loss放入历史信息
            preds.extend(np.argmax(output.detach().cpu().numpy(), 1)) # 保存预测值
            labels.extend(label.numpy()) # 保存label，以便计算当前轮的准确率

        test_preds = []
        test_labels = []
        model.eval() # model切换为评估模式
        for item in valid_dataloader: # 遍历验证集
            value, label = item[0], item[1] # 获取数据
            output = model(value.to(device)) # 得到输出
            loss = loss_func(output.squeeze(), label.to(device)) # 计算loss
            mean_test_loss.append(loss.detach().cpu().numpy()) # 保存loss
            test_preds.extend(np.argmax(output.detach().cpu().numpy(), 1))
            test_labels.extend(label.numpy())
        # 上面的mean_train_loss和mean_test_loss是保存一个epoch内的loss信息，历史信息保存的是每个epoch的loss
        history['train loss'].append(np.mean(mean_train_loss))
        history['test loss'].append(np.mean(mean_test_loss))
        history['train acc'].append(accuracy_score(preds, labels))
        history['test acc'].append(accuracy_score(test_preds, test_labels))
        print(  # 打印该epoch的两个loss和两个accuracy
            'Epoch {}/{}: train loss is {:.4f}, test loss is {:.4f}, train acc is {:.4f}, test acc is {:.4f}'
                .format(epoch + 1, 100, np.mean(mean_train_loss), np.mean(mean_test_loss),
                        accuracy_score(preds, labels), accuracy_score(test_preds, test_labels)))
        print(np.mean(labels), np.mean(test_labels))
        # lr_scheduler.step()  # 更新学习率
        if np.mean(mean_test_loss) < best_loss: # 这是判断保存验证集loss最小模型的操作
            best_loss = np.mean(mean_test_loss)
            torch.save(model, os.path.join('./checkpoints/', model_name))
    # 画出历史两个loss的折线图并保存下来
    plt.figure()
    plt.plot(np.arange(0, epochs), history['train loss']) # 折线图绘制
    plt.plot(np.arange(0, epochs), history['test loss'])
    plt.legend(['train loss', 'test loss'])
    plt.title('Loss') # 标题
    plt.savefig('loss_{}.png'.format(model_name)) # 保存
    plt.show() # 展示图片

    # 画出历史两个acc的折线图并保存下来
    plt.figure()
    plt.plot(np.arange(0, epochs), history['train acc']) # 折线图绘制
    plt.plot(np.arange(0, epochs), history['test acc'])
    plt.legend(['train acc', 'test acc'])
    plt.title('Accuracy') # 标题
    plt.savefig('Accuracy_{}.png'.format(model_name)) # 保存
    plt.show() # 展示图片

    ############ testing ############
    test_preds = []
    test_labels = []
    model.eval()  # model切换为评估模式
    for item in test_dataloader:  # 遍历验证集
        value, label = item[0], item[1]  # 获取数据
        output = model(value.to(device))  # 得到输出
        test_preds.extend(np.argmax(output.detach().cpu().numpy(), 1))
        test_labels.extend(label.numpy())

    print(classification_report(test_labels, test_preds, digits=4)) # 准确率，召回率，f1 score

def test():
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')  # 确认执行设备
    model = torch.load(os.path.join('./checkpoints/', model_name))
    test_dataloader = get_dataloader(True) # 仅获得测试数据的dataloader
    test_preds = []
    test_labels = []
    model.eval()  # model切换为评估模式
    for item in test_dataloader:  # 遍历验证集
        value, label = item[0], item[1]  # 获取数据
        output = model(value.to(device))  # 得到输出
        test_preds.extend(np.argmax(output.detach().cpu().numpy(), 1))
        test_labels.extend(label.numpy())

    print(classification_report(test_labels, test_preds, digits=4))

In [None]:
training()