In [2]:
import numpy as np
import torch
from torch import nn
from torch.autograd import Variable
from mpl_toolkits.mplot3d import Axes3D

import matplotlib.pyplot as plt
#import tensorwatch as tw

In [3]:
# Train Data load
race_data = np.load('race_data_mit_222010103.npz')
swa_cmd = race_data['swa_cmd']
tho_cmd = race_data['tho_cmd']
brk_cmd = race_data['brk_cmd']
vx = race_data['vx']
vy = race_data['vy']
ax = race_data['ax']
ay = race_data['ay']

data = np.stack((ax,vx,tho_cmd,brk_cmd,swa_cmd),axis=0) # 总数据5维
data = np.transpose(data)
print(data.shape)

(6355, 5)


In [4]:
#多维归一化  返回数据和最大最小值
def NormalizeMult(data):
    data = np.array(data)
    normalize = np.arange(2*data.shape[1],dtype='float64')
    normalize = normalize.reshape(data.shape[1],2)
    for i in range(0,data.shape[1]): #第i列  
        listi = data[:,i]
        listlow,listhigh =  np.percentile(listi, [0, 100])
        # print(i)
        normalize[i,0] = listlow
        normalize[i,1] = listhigh
        delta = listhigh - listlow
        if delta != 0: #第j行
            for j in range(0,data.shape[0]):
                data[j,i]  =  (data[j,i] - listlow)/delta
        #np.save("./normalize.npy",normalize)
    return  data,normalize

data_norm,normalize = NormalizeMult(data)
print(data_norm.shape)
print(normalize.shape)

(6355, 5)
(5, 2)


In [5]:
def create_dataset(data_norm, look_back):
    ratio = 0.8
    # 创建训练集
    data_X, data_Y = [],[]
    for i in range(len(data_norm)-look_back):
        a = data_norm[i:(i+look_back)]
        data_X.append(a)
        data_Y.append(data_norm[i+look_back,0]) # inly 
    # 划分训练集和测试集
    train_size = int(len(data_X)*ratio)
    #print(train_size)
    test_size = len(data_X) - train_size
    train_X = data_X[:train_size]
    train_Y = data_Y[:train_size]
    test_X = data_X[train_size:]
    test_Y = data_Y[train_size:]
    
    return np.array(train_X),np.array(train_Y),np.array(test_X), np.array(test_Y), data_X,data_Y


In [6]:
tr_x, tr_y,te_x,te_y,data_X, data_Y = create_dataset(data_norm,10)

print(te_x.shape)  # 训练集
print(te_y.shape) # 测试集
print(len(data_X))
print(len(data_X[0]))
print(len(data_X[0][0]))

(1269, 10, 5)
(1269,)
6345
10
5


In [7]:
# LSTM网络包装函数
class RegLSTM(nn.Module):
    def __init__(self, inp_dim, out_dim, mid_dim, mid_layers):
        super(RegLSTM, self).__init__()

        self.rnn = nn.LSTM(inp_dim, mid_dim, mid_layers)  # rnn
        self.reg = nn.Sequential(
            nn.Linear(mid_dim, mid_dim),
            nn.Tanh(),
            nn.Linear(mid_dim, out_dim),
        )  # regression
        #self.reg = nn.Linear(mid_dim,out_dim)

        # x = feature
    def forward(self, x):
        y = self.rnn(x)[0]  # y, (h, c) = self.rnn(x)

        seq_len, batch_size, hid_dim = y.shape
        y = y.view(seq_len*batch_size, hid_dim)
        y = self.reg(y)
        y = y.view(seq_len, batch_size, -1)
        return y

    """
    PyCharm Crtl+click nn.LSTM() jump to code of PyTorch:
    Examples::
        >>> rnn = nn.LSTM(10, 20, 2)
        >>> input = torch.randn(5, 3, 10)
        >>> h0 = torch.randn(2, 3, 20)
        >>> c0 = torch.randn(2, 3, 20)
        >>> output, (hn, cn) = rnn(input, (h0, c0))
    """

    def output_y_hc(self, x, hc):
        y, hc = self.rnn(x, hc)  # y, (h, c) = self.rnn(x)

        seq_len, batch_size, hid_dim = y.size()
        y = y.view(-1, hid_dim)
        y = self.reg(y)
        y = y.view(seq_len, batch_size, -1)
        return y, hc

# Trainning

In [8]:
inp_dim = 10 #输入维度，指特征向量的长度,即上文load_data()中的look_back
out_dim = 1 # 输出维度1 
mod_dir = '.'

'''load data'''
ratio = 0.8
look_back = inp_dim
train_x,train_y,test_x,test_y,data_X,data_Y = create_dataset(data_norm,look_back)
train_size = int(len(data_X)*ratio)

#需要将数据改变一下形状，因为 RNN 读入的数据维度是 (seq, batch, feature)，
#所以要重新改变一下数据的维度，这里只有一个序列，所以 batch 是 1，
# feature是一个seq里有几个数，这里是2

train_x = train_x.reshape((-1, 5, look_back)) # change from (N,3) to (N,1,3)
train_y = train_y.reshape((-1, 1, out_dim))
test_x = test_x.reshape((-1, 5,look_back))
test_y = test_y.reshape((-1, 1, out_dim))

'''build model'''
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
net = RegLSTM(inp_dim, out_dim, mid_dim=4, mid_layers=2).to(device) # mid_dim即隐藏层维度
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(net.parameters(), lr=1e-2)

'''train'''
var_x = torch.tensor(train_x, dtype=torch.float32, device=device)
var_y = torch.tensor(train_y, dtype=torch.float32, device=device)
#var_x = Variable(torch.from_numpy(train_x))
#var_y = Variable(torch.from_numpy(train_y))
print('var_x.size():', var_x.size())
print('var_y.size():', var_y.size())

for e in range(1024):
    out = net(var_x)
    loss = criterion(out, var_y)

    optimizer.zero_grad()
    loss.backward()
    optimizer.step()

    if (e + 1) % 100 == 0:  # 每 100 次输出结果
        print('Epoch: {}, Loss: {:.5f}'.format(e + 1, loss.item()))
print('Model is ready!')



var_x.size(): torch.Size([5076, 5, 10])
var_y.size(): torch.Size([5076, 1, 1])


  return F.mse_loss(input, target, reduction=self.reduction)


Epoch: 100, Loss: 0.00628
Epoch: 200, Loss: 0.00448
Epoch: 300, Loss: 0.00404
Epoch: 400, Loss: 0.00389
Epoch: 500, Loss: 0.00379
Epoch: 600, Loss: 0.00372
Epoch: 700, Loss: 0.00369
Epoch: 800, Loss: 0.00367
Epoch: 900, Loss: 0.00365
Epoch: 1000, Loss: 0.00359
Model is ready!


In [9]:
# 保存训练好的网络
torch.save(net.state_dict(), '{}/race_lstm.pth'.format(mod_dir))

# Evaluation

In [22]:
# 传统预测，给定输入后，基于过去的n帧状态，预测之后的1帧状态，全量预测
net  = net.eval()
net  = net.double()
data_X = np.array(data_X).reshape(-1,5,inp_dim)
data_X = torch.from_numpy(data_X)
print(data_X.shape)
var_data = Variable(data_X)
pred_res = net(var_data)
print(pred_res.shape)

%matplotlib auto
# %matplotlib inline
#pred_test = pred_res.view(-1).data.numpy()
pred_test = []
cout = 0
for i in pred_res:
    pred_test.append(i[-1])
    #plt.plot(range(cout, cout+5), i.detach().numpy()) # 显示所有预测值
    cout = cout +1 
pred_test  = np.array(pred_test)

    
plt.plot(pred_test,'r',label = 'pred', linewidth=1.5)
plt.plot(np.array(data_Y), 'b', label = 'real')
plt.plot([train_size, train_size], [0, 1], label='train | predict')
plt.ylabel('Acc[m/s^2]')
plt.title('Validate Model on Trainning Lap')
plt.legend()

torch.Size([15830, 5, 10])
torch.Size([15830, 5, 1])
Using matplotlib backend: TkAgg


<matplotlib.legend.Legend at 0x7fb0545fce10>

# 迁移学习验证
#### 使用数据集A训练的模型，在数据集B上进行预测，对比数据集B内真值和预测值的差异。

In [23]:
# Validate Data load
race_data = np.load('race_data_mit_223010103.npz')
swa_cmd = race_data['swa_cmd']
tho_cmd = race_data['tho_cmd']
brk_cmd = race_data['brk_cmd']
vx = race_data['vx']
vy = race_data['vy']
ax = race_data['ax']
ay = race_data['ay']

data_vil = np.stack((ax,vx,tho_cmd,brk_cmd,swa_cmd),axis=0) # 总数据5维
data_vil = np.transpose(data_vil)
print(data_vil.shape)

data_norm_vil,normalize = NormalizeMult(data_vil)

'''load data'''
ratio = 0.95
train_x,train_y,test_x,test_y,data_X,data_Y = create_dataset(data_norm_vil,look_back)
train_size = int(len(data_X)*ratio)

# 模型验证
net  = net.eval()
net  = net.double()
data_X = np.array(data_X).reshape(-1,5,inp_dim)
data_X = torch.from_numpy(data_X)
print(data_X.shape)
var_data = Variable(data_X)
pred_res = net(var_data)
print(pred_res.shape)

%matplotlib auto
# %matplotlib inline
#pred_test = pred_res.view(-1).data.numpy()
pred_test = []
cout = 0
for i in pred_res:
    pred_test.append(i[-1])
    #plt.plot(range(cout, cout+5), i.detach().numpy()) # 显示所有预测值
    cout = cout +1 
pred_test  = np.array(pred_test)

    
plt.plot(pred_test,'r',label = 'pred', linewidth=1.5)
plt.plot(np.array(data_Y), 'b', label = 'real')
plt.plot([train_size, train_size], [0, 1], label='train | predict')
plt.ylabel('Acc[m/s^2]')
plt.title('Validate Model on Other Lap')
plt.legend()

(7977, 5)
torch.Size([7967, 5, 10])
torch.Size([7967, 5, 1])
Using matplotlib backend: TkAgg


<matplotlib.legend.Legend at 0x7fb074605f98>