In [None]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
import scipy.io as scio
import pandas as pd
import numpy as np
import torch.nn.functional as F
from torch.autograd import Variable
def Mydata(dataset, labels):
    y = []
    for i in range(len(labels)):
        x = (dataset[i], labels[i].item())
        y.append(x)
    return tuple(y)

In [None]:
# Device configuration
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Hyper-parameters

sequence_length = 15
input_size = 32 #输入特征维度
hidden_size = 256 #隐藏层单元数
num_layers = 3 #神经网络层数
num_classes = 3 #分类数
batch_size = 25 
num_epochs = 10
learning_rate = 0.008
embed_dim = input_size
num_samples = 1500

数据读取

In [None]:
dataFile = r'/content/test_data.mat'
data = scio.loadmat(dataFile)
test_data = data['test_data'].astype(np.double)
train_data = data['train_data'].astype(np.double)
train_data = torch.from_numpy(train_data)
test_data = torch.from_numpy(test_data)

label2 = data['test_label'].astype(np.long)
label1 = data['train_label'].astype(np.long)
train_labels = torch.zeros(num_samples)
test_labels = torch.zeros(num_samples)
for i in range(num_samples):
    if label1[0][i] == 1:
        train_labels[i] = 0
    elif label1[1][i] == 1:
        train_labels[i] = 1
    else:
        train_labels[i] = 2
    if label2[0][i] == 1:
        test_labels[i] = 0
    elif label2[1][i] == 1:
        test_labels[i] = 1
    else:
        test_labels[i] = 2 
train_dataset = Mydata(train_data,train_labels)
test_dataset = Mydata(test_data,test_labels)
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                           batch_size=batch_size,
                                           shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                           batch_size=batch_size,
                                           shuffle=True)

神经网络模型

In [None]:
# 双向循环神经网络 (many-to-one)
# 将接下来创建的变量类型均为Double
torch.set_default_tensor_type(torch.DoubleTensor)

class BiRNN(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes):
        super(BiRNN, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.GRU = nn.GRU(input_size,# x的特征维度
                            hidden_size,# 隐藏层单元数
                            num_layers,# 层数
                            batch_first=True,# 第一个维度设为 batch, 即:(batch_size, seq_length, input_size)
                            bidirectional=True) # 是否用双向
        self.weight_W = nn.Parameter(torch.Tensor(2 * hidden_size, 2 * hidden_size))
        self.weight_proj = nn.Parameter(torch.Tensor(2 * hidden_size, 1))
        self.fc = nn.Linear(hidden_size * 2, num_classes)  # 2 for bidirection
        nn.init.uniform_(self.weight_W, -0.1, 0.1)
        nn.init.uniform_(self.weight_proj, -0.1, 0.1)
    def forward(self, x):
        # x维度为(batch_size, time_step, input_size)
        # 隐层初始化
        # h0维度为(num_layers*direction_num, batch_size, hidden_size)
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_size).to(device)  # 2 for bidirection
        #input = input.permute(1, 0, 2)
        # LSTM前向传播，此时out维度为(batch_size, seq_length, hidden_size*2)
        # hn表示最后一个状态，维度与h0一样
        out, hn = self.GRU(x, h0)
        # attention机制
        x = out
        u = torch.tanh(torch.matmul(x, self.weight_W))
        att = torch.matmul(u, self.weight_proj)
        att_score = F.softmax(att, dim=1)
        scored_x = x * att_score
        # attention结束
        # 最后一步的输出,即(batch_size, -1, output_size)
        feat = torch.sum(scored_x, dim=1)
        #out = self.fc(out[:, -1, :])

        return self.fc(feat)

训练模块

In [None]:
model = BiRNN(input_size, hidden_size, num_layers, num_classes).to(device)
model = model.double()
# Loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)

# Train the model
total_step = len(train_loader)
# 下面是训练部分
for epoch in range(num_epochs):
    for i, (images, labels) in enumerate(train_loader):
        images = Variable(images.view(-1, sequence_length, input_size).to(device))
        labels = Variable(labels.to(device))
        # Forward pass
        outputs = model(images)
        #print(outputs, labels.long())
        loss = criterion(outputs, labels.long())
        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        #if (i + 1) % 15 == 0:
        #    print('Epoch [{}/{}], Step [{}/{}], Loss: {:.4f}'.format(epoch + 1, num_epochs, i + 1, total_step, loss.item()))


测试模块

In [None]:
# Test the model
with torch.no_grad():
    correct = 0
    total = 0
  
    for images, labels in test_loader:
        images = images.reshape(-1, sequence_length, input_size).to(device)
        labels = labels.to(device)
        outputs = model(images)
        _, predicted = torch.max(outputs.data, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()

   
    print('Test Accuracy of the model on the 1500 test images: {} %'.format(100 * correct / total))

Test Accuracy of the model on the 1500 test images: 98.66666666666667 %
