In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
from torch.autograd import Variable

In [None]:
# 搭建RNN-LSTM Model (Many-to-One)
class LSTM(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, num_classes,device):
        super(LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.lstm = nn.LSTM(input_size, hidden_size, num_layers, batch_first=True)  # batch_first=True仅仅针对输入而言
        self.fc = nn.Linear(hidden_size, num_classes)

    def forward(self, x):
        # 设置初始状态h_0与c_0的状态是初始的状态，一般设置为0，尺寸是,x.size(0)
        h0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)).to(device)
        c0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)).to(device)

        # Forward propagate RNN
        out, (h_n, c_n) = self.lstm(x, (h0, c0))  # 送入一个初始的x值，作为输入以及(h0, c0)

        # Decode hidden state of last time step
        out = self.fc(out[:, -1, :])  # output也是batch_first, 实际上h_n与c_n并不是batch_first
        return out

In [None]:
#test
#利用图像识别数据集验证LSTM是否成功
from sklearn import datasets
digits = datasets.load_digits()
print(digits.images.shape)
fig, axes = plt.subplots(10, 10, figsize=(8, 8))
fig.subplots_adjust(hspace=0.1, wspace=0.1)

for i, ax in enumerate(axes.flat):
    ax.imshow(digits.images[i], cmap='binary', interpolation='nearest')
    ax.text(0.05, 0.05, str(digits.target[i]),
            transform=ax.transAxes, color='green')
    ax.set_xticks([])
    ax.set_yticks([])

from sklearn.model_selection import train_test_split
d2d = digits.images.reshape(1797,64,)
df = pd.DataFrame(d2d)
df['target'] = digits.target
df.head()
itrain, itest = train_test_split(range(df.shape[0]), train_size=0.6)
set1 = {}
set1['Xtrain'] = df[list(range(64))].iloc[itrain, :]
set1['Xtest'] = df[list(range(64))].iloc[itest, :]
set1['ytrain'] = df.target.iloc[itrain]
set1['ytest'] = df.target.iloc[itest]
x_train=np.array(set1['Xtrain'])
x_train=x_train.reshape(1078,8, 8)
x_test=np.array(set1['Xtest'])
x_test=x_test.reshape(719,8, 8)
y_train=np.array(set1['ytrain'])
y_train=y_train.reshape(1078, )
y_test=np.array(set1['ytest']) 
y_test=y_test.reshape(719,)

In [None]:
# Hyper Parameters
sequence_length = 8  # 序列长度，将图像的每一列作为一个序列
input_size = 8  # 输入数据的维度
hidden_size = 256  # 隐藏层的size
num_layers =  2 # 有多少层

num_classes = 10
batch_size = 256
num_epochs = 300
learning_rate = 0.001

device = torch.device("cuda:1")

In [None]:
lstm = LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes,device=device)
lstm.to(device)

In [None]:
# Loss and Optimizer
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(lstm.parameters(), lr=learning_rate)

In [None]:
from torch.utils.data import Dataset
#实现custom pytorch dataset
class GetLoader(Dataset):
# 初始化函数，得到数据
    def __init__(self, data_root, data_label):
        self.data = data_root
        self.label = data_label
    # index是根据batchsize划分数据后得到的索引，最后将data和对应的labels进行一起返回
    def __getitem__(self, index):
        data = self.data[index]
        labels = self.label[index]
        return data, labels
    # 该函数返回数据大小长度，目的是DataLoader方便划分，如果不知道大小，DataLoader会一执行错误
    def __len__(self):
        return len(self.data)

In [None]:
train=GetLoader(x_train,y_train)
test=GetLoader(x_test,y_test)

In [None]:
train_loader=torch.utils.data.DataLoader(dataset=train,batch_size=batch_size,shuffle=True,num_workers=1)
test_loader=torch.utils.data.DataLoader(dataset=test,batch_size=719,shuffle=False,num_workers=1)

In [None]:
#训练过程
total_step=0
for epoch in range(300):
    for data in train_loader:
        images, labels = data
        images=images.type(torch.FloatTensor)
        labels=labels.type(torch.FloatTensor)
        images=images.to(device)
        labels=labels.to(device)
        #forward pass
        outputs=lstm(images)
        loss=loss_function(outputs,labels.long())
        
        #Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_step+=1
        if (total_step)%10==0:#each 10 iterations is one epoch
            print("Epoch [{}/{}],step[{}] Loss:{:.4f}".format(epoch+1,num_epochs,total_step,loss.item()))

In [None]:
#预测过程, 尝试了deeper的network和不同参数，基本上在mnist效果都差不多；
#跟resnet相比效果也差不多；但LSTM训练速度明显快于ResNet且迭代效果也更好
with torch.no_grad():
    correct=0
    total=0
    for images, labels in test_loader:
        images=images.type(torch.FloatTensor)
        labels=labels.type(torch.FloatTensor)
        images=images.to(device)
        labels=labels.to(device)
        outputs=lstm(images)
        _,predicted=torch.max(outputs.data,1)
        total=labels.size(0)
        correct=(predicted==labels).sum().item()
print('accuracy of the model on the test images: {}%'.format(100*correct/total))
#accuracy of the model on the test images: 97.63560500695411%

In [None]:
# 搭建CONV1D-LSTM Model (Many-to-One)
class CONV1D_LSTM(nn.Module):
    def __init__(self ,in_channel,out_channel, hidden_size, num_layers, num_classes,device):
        super(CONV1D_LSTM, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.conv1 = nn.Conv1d(in_channels=in_channel, out_channels=out_channel,kernel_size=1, stride=1, bias=False)
        self.bn1 = nn.BatchNorm1d(out_channel)
        self.conv2 = nn.Conv1d(in_channels=out_channel, out_channels=out_channel,kernel_size=1, stride=1, bias=False)
        self.bn2 = nn.BatchNorm1d(out_channel)
        self.relu = nn.ReLU()
        self.lstm = nn.LSTM(out_channel, hidden_size, num_layers, batch_first=True)  # batch_first=True仅仅针对输入而言
        self.fc = nn.Linear(hidden_size, num_classes)
        
        for m in self.modules():
            if isinstance(m, nn.Conv1d):
                nn.init.kaiming_normal_(m.weight.to(device), mode='fan_out', nonlinearity='relu')


        
        
        
    def forward(self, x):
        
        #forward prop
        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)
        
        out = self.conv2(out)
        out = self.bn2(out)
        out = self.relu(out)
        
        out=torch.transpose(out,2,1)
        #因为pytorch里lstm和conv1d的input sequence位置不一样，需要调整。
        
        # 设置初始状态h_0与c_0的状态是初始的状态，一般设置为0，尺寸是,x.size(0)
        h0 = Variable(torch.zeros(self.num_layers, out.size(0), self.hidden_size)).to(device)
        c0 = Variable(torch.zeros(self.num_layers, out.size(0), self.hidden_size)).to(device)

        # Forward propagate RNN
        out, (h_n, c_n) = self.lstm(out, (h0, c0))  # 送入一个初始的x值，作为输入以及(h0, c0)

        # Decode hidden state of last time step
        out = self.fc(out[:, -1, :])  # output也是batch_first, 实际上h_n与c_n并不是batch_first
        return out

In [None]:
# Hyper Parameters
sequence_length = 8  # 序列长度，将图像的每一列作为一个序列
in_channel = 8
out_channel=32
hidden_size = 128  # 隐藏层的size
num_layers =  2 # 有多少层

num_classes = 10
batch_size = 256
num_epochs = 100
learning_rate = 0.01

device = torch.device("cuda:1")

In [None]:
conv1d_lstm = CONV1D_LSTM(in_channel=in_channel, out_channel=out_channel,
                   hidden_size=hidden_size, num_layers=num_layers, num_classes=num_classes,device=device)
conv1d_lstm.to(device)

In [None]:
# Loss and Optimizer
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(conv1d_lstm.parameters(), lr=learning_rate)

In [None]:
#训练过程
total_step=0
for epoch in range(num_epochs):
    for data in train_loader:
        images, labels = data
        images=images.type(torch.FloatTensor)
        labels=labels.type(torch.FloatTensor)
        images=images.to(device)
        labels=labels.to(device)
        #forward pass
        outputs=conv1d_lstm(images)
        loss=loss_function(outputs,labels.long())
        
        #Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_step+=1
        if (total_step)%10==0:#each 10 iterations is one epoch
            print("Epoch [{}/{}],step[{}] Loss:{:.4f}".format(epoch+1,num_epochs,total_step,loss.item()))

In [None]:
#预测过程, 和普通lstm效果差不多
with torch.no_grad():
    correct=0
    total=0
    for images, labels in test_loader:
        images=images.type(torch.FloatTensor)
        labels=labels.type(torch.FloatTensor)
        images=images.to(device)
        labels=labels.to(device)
        outputs=conv1d_lstm(images)
        _,predicted=torch.max(outputs.data,1)
        total=labels.size(0)
        correct=(predicted==labels).sum().item()
print('accuracy of the model on the test images: {}%'.format(100*correct/total))
#accuracy of the model on the test images: 98.19193324061196%

In [None]:
# 搭建ResNet-LSTM Model (Many-to-One) 相较于简单的cnn-lstm，resnet可以防止梯度消失
#搭建第一类ResNet block
class BasicBlock(nn.Module):#基本残差网络的一个模块类
    expansion = 1#每一个residual block中不改变width,height,channel数，即增加的residual部分不需要做卷积处理

    def __init__(self, in_channel, out_channel, stride=1, downsample=None):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv1d(in_channels=in_channel, out_channels=out_channel,
                               kernel_size=3, stride=stride, padding=1, bias=False)#stride=1,kernel_size=3,padding=1保证了data的
                                                                                   #size不变 
        self.bn1 = nn.BatchNorm1d(out_channel)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv1d(in_channels=out_channel, out_channels=out_channel,
                               kernel_size=3, stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm1d(out_channel)
        self.downsample = downsample

    def forward(self, x):
        identity = x
        if self.downsample is not None:  #BasicBlock内不需要调整residual的height,width,channel
            identity = self.downsample(x)

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        out += identity
        out = self.relu(out)

        return out

In [None]:
#组建block成ResNet
class ResNet1D_LSTM(nn.Module):
    def __init__(self, block, blocks_num, num_classes, feature_channel,hidden_size, num_layers,device ):
        super(ResNet1D_LSTM, self).__init__()
        self.in_channel = 64
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.conv1 = nn.Conv1d(in_channels=feature_channel, out_channels=self.in_channel, kernel_size=2, stride=2,
                               padding=0, bias=False)
        self.bn1 = nn.BatchNorm1d(self.in_channel)
        self.relu = nn.ReLU()
        self.maxpool = nn.MaxPool1d(kernel_size=3, stride=2, padding=1)  #channel数变为n/2(非整数时向下取整)
        self.layer1 = self._make_layer(block, 64, blocks_num[0])         #按照已有结论按二次方形式增长ResNet不同阶段的channel
        self.layer2 = self._make_layer(block, 128, blocks_num[1], stride=2)#stride=2表示想把上一个layer传过来的size缩减为1/2
        self.layer3 = self._make_layer(block, 256, blocks_num[2], stride=2)
        self.avgpool = nn.AdaptiveAvgPool1d((1, 1))  # output size = (1, 1) 
        self.lstm = nn.LSTM(256, hidden_size, num_layers, batch_first=True)  # batch_first=True仅仅针对输入而言
        self.fc = nn.Linear(hidden_size, num_classes)

        for m in self.modules():
            if isinstance(m, nn.Conv2d):
                nn.init.kaiming_normal_(m.weight.to(device), mode='fan_out', nonlinearity='relu')

    def _make_layer(self, block, channel, block_num, stride=1):
        downsample = None
        if stride != 1 or self.in_channel != channel * block.expansion:
            downsample = nn.Sequential(
                nn.Conv1d(self.in_channel, channel * block.expansion, kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm1d(channel * block.expansion))

        layers = []
        layers.append(block(self.in_channel, channel, downsample=downsample, stride=stride))
        self.in_channel = channel * block.expansion

        for _ in range(1, block_num):
            layers.append(block(self.in_channel, channel))

        return nn.Sequential(*layers)

    def forward(self, x):
        x = self.conv1(x) #预处理
        x = self.bn1(x)
        x = self.relu(x)
        x = self.maxpool(x)

        x = self.layer1(x) #残差网络
        x = self.layer2(x)
        x = self.layer3(x)
        
        x=torch.transpose(x,2,1)
        #因为pytorch里lstm和conv1d的input sequence位置不一样，需要调整。
        
        # 设置初始状态h_0与c_0的状态是初始的状态，一般设置为0，尺寸是,x.size(0)
        h0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)).to(device)
        c0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size)).to(device)

        # Forward propagate RNN
        out, (h_n, c_n) = self.lstm(x, (h0, c0))  # 送入一个初始的x值，作为输入以及(h0, c0)

        # Decode hidden state of last time step
        out = self.fc(out[:, -1, :])  # output也是batch_first, 实际上h_n与c_n并不是batch_first
        return out

In [None]:
# Hyper Parameters
sequence_length = 8  # 序列长度，将图像的每一列作为一个序列
feature_channel=8
hidden_size = 512  # 隐藏层的size
num_layers =  2 # 有多少层

num_classes = 10
batch_size = 256
num_epochs = 600
learning_rate = 0.001

device = torch.device("cuda:1")

In [None]:
resnet_lstm=ResNet1D_LSTM(BasicBlock, [3, 4, 3], num_classes=num_classes, feature_channel=feature_channel,
                        hidden_size=hidden_size,num_layers=num_layers,device = torch.device("cpu"))
resnet_lstm.to(device)

In [None]:
# Loss and Optimizer
loss_function = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(resnet_lstm.parameters(), lr=learning_rate)

In [None]:
#训练过程:相比前几个模型，resnet_lstm需要较多epoch才能训练成功, 所需训练的epoch数也与网络深度有关（梯度消失）
#       可能需要对网络结构再进行改良
total_step=0
for epoch in range(num_epochs):
    for data in train_loader:
        images, labels = data
        images=images.type(torch.FloatTensor)
        labels=labels.type(torch.FloatTensor)
        images=images.to(device)
        labels=labels.to(device)
        #forward pass
        outputs=resnet_lstm(images)
        loss=loss_function(outputs,labels.long())
        
        #Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        total_step+=1
        if (total_step)%10==0:#each 10 iterations is one epoch
            print("Epoch [{}/{}],step[{}] Loss:{:.4f}".format(epoch+1,num_epochs,total_step,loss.item()))

In [None]:
#预测过程, 和普通lstm效果差不多
with torch.no_grad():
    correct=0
    total=0
    for images, labels in test_loader:
        images=images.type(torch.FloatTensor)
        labels=labels.type(torch.FloatTensor)
        images=images.to(device)
        labels=labels.to(device)
        outputs=resnet_lstm(images)
        _,predicted=torch.max(outputs.data,1)
        total=labels.size(0)
        correct=(predicted==labels).sum().item()
print('accuracy of the model on the test images: {}%'.format(100*correct/total))

In [None]:
#4层resnet（feature：64->128->256->512） + lstm 测试结果：
#100epoch:0.9055
#600epoch:0.9735
#700epoch:0.9735
#800epoch:0.9791
#900epoch:0.9735
#1000epoch:0.9596
#3层resnet（feature：64->128->256） + lstm 测试结果：
#100epoch:0.94
#600epoch:0.9624
#700epoch:0.9638
#800epoch:0.9652
#900epoch:0.9694
#1000epoch:0.9666