In [1]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
%matplotlib inline
plt.rcParams['font.sans-serif'] = ['SimHei']
plt.rcParams['axes.unicode_minus'] = False

import torch
import torch.nn as nn
import datetime as dt
import time as time

import warnings
warnings.filterwarnings('ignore')

#保证每次结果一致
seed = 4444
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
import models
import time
from torch.utils.data import TensorDataset,Dataset,DataLoader,random_split

In [None]:
#导入数据
df = pd.read_csv("data/ship/ship_motion.csv")
# 检查每一列是否包含空值
null_counts = df.isnull().sum()

# 打印包含空值的列
for column, count in null_counts.items():
    if count > 0:
        print(f"列 '{column}' 有 {count} 个空值.")
    else:
        print("未发现空值")
        break
df.head()


class config:
    L = 6
    H = 1
    input_size = 3
    hidden_size = 64
    num_layers = 6
    batch_size = 32
    learning_rate = 0.001
    out_channels = 9


#滑动窗口函数
def univariate_data(sequence, input_size, output_size, step):
    inputs = []
    outputs = []
    for i in range(0, len(sequence) - input_size - output_size + 1, step):
        inputs.append(sequence[i:i + input_size].reshape((1, input_size, 1)))
        outputs.append(sequence[i + input_size:i + input_size + output_size])
    return np.concatenate(inputs, axis=0), np.array(outputs)


#训练输入
train_x = df['Heave'].values.astype(np.float32)[:6096]  #选取标签列前4500个数据点作为训练数据
norm_train_x = (train_x - np.min(train_x)) / (np.max(train_x) - np.min(train_x))  #归一化
train_seq, train_label = univariate_data(norm_train_x, input_size=config.L, output_size=config.H, step=1)

feature_columns = ['Heave', 'Pitch', 'Roll']
trainX = []

for column in feature_columns:
    train_xx = df[column].values.astype(np.float32)[:6096]
    norm_train_xx = (train_xx - np.min(train_xx)) / (np.max(train_xx) - np.min(train_xx))
    train_seqx, _ = univariate_data(norm_train_xx, input_size=config.L, output_size=config.H, step=1)
    trainX.append(train_seqx)

# 合并特征作为输入
trainX = np.concatenate(trainX, axis=2)

#训练标签
trainY = train_label

#测试输入
def preprocess_data(df, columns, input_size, output_size):
    # 用于存储特征序列的列表
    feature_seqs = []
    for column in columns:
        # 提取特征并归一化
        feature = df[column].values.astype(np.float32)[6096:6222]
        norm_feature = (feature - np.min(feature)) / (np.max(feature) - np.min(feature))
        # 创建特征序列
        feature_seq, _ = univariate_data(norm_feature, input_size=input_size, output_size=output_size, step=1)
        feature_seqs.append(feature_seq)

    # 合并特征序列
    input_data = np.concatenate(feature_seqs, axis=2)

    # 提取标签数据
    test_x = df['Heave'].values.astype(np.float32)[6096:6222]
    norm_label = (test_x - np.min(test_x)) / (np.max(test_x) - np.min(test_x))
    text_seq, _ = univariate_data(norm_label, input_size=input_size, output_size=output_size, step=1)

    return input_data, text_seq, test_x

# 指定特征列和参数
feature_columns = ['Heave','Pitch','Roll']
input_size = config.L
output_size = config.H

# 调用函数进行数据预处理
testX, testY, test_x = preprocess_data(df, feature_columns, input_size, output_size)

class LSTM(nn.Module):
    def __init__(self):
        super(LSTM,self).__init__()
        self.lstm = nn.LSTM(input_size = config.input_size, hidden_size = 64, num_layers =2, batch_first = True)
        self.linear = nn.Linear(64,config.H) #线性映射，从LSTM输出隐藏层维度为64，需要映射到H步长

    def forward(self,x):
        h_0 = torch.zeros(2, x.size(0), 64) #2指2个LSTM层，64指64个隐藏层
        c_0 = torch.zeros(2, x.size(0), 64)
        ula, (h_out, _) = self.lstm(x, (h_0, c_0))
        h_out = h_out[1]            #取最后一个LSTM层的输出[num_layers,B,hidden_size] -> [B,hidden_size]
        out = self.linear(h_out)    #线性映射
        return out

# 训练训练训练训练训练训练
X_train = torch.from_numpy(trainX)
Y_train = torch.from_numpy(trainY)
#dataloader
seq_loader = DataLoader(X_train,batch_size=config.batch_size,drop_last=True)
label_loader = DataLoader(Y_train,batch_size=config.batch_size,drop_last=True)

model = LSTM()
criterion = nn.MSELoss(reduction='mean')#定义损失函数
optimizer = torch.optim.Adam(model.parameters(),lr=0.001)

print("Training process initializing .....\n")
import time
start = time.time()

for e in range(200):
    total_loss = 0.
    for seq,label in zip(seq_loader,label_loader):
        #label = label.squeeze(2)若用最大最小归一化函数需要加这一句
        output = model(seq)  #模型拟合,分批次输入进行拟合训练

        optimizer.zero_grad()          #清空过往梯度
        loss = criterion(output,label) #计算输出和真实值之间的损失
        loss.backward()                #反向传播，计算当前梯度
        optimizer.step()               #优化器更新
        total_loss += loss             #累加损失

    if e % 10 == 0:
        print('[Epoch {}] ,loss: {: .6f}'.format(e+1,total_loss))

end = time.time()
print("\nFinish Training")
print("Training Time ", end - start)

torch.save(model.state_dict(),"Model_save/LSTM.pth")#保存模型

# 测试测试测试测试测试测试测试测试
X_test = torch.from_numpy(testX)
Y_test = torch.from_numpy(testY)

#调用模型
model.load_state_dict(torch.load("Model_save/LSTM.pth"))
model.eval()
#生成预测值
prediction = model(X_test)
result = prediction.detach()

result = result[:,0]
forecast = (np.max(test_x)-np.min(test_x))*result+np.min(test_x)#反归一化

#真实值
Y_test0 = Y_test[:,0]
original = (np.max(test_x)-np.min(test_x))*Y_test0+np.min(test_x)#反归一化

# 评价指标评价指标评价指标评价指标
from sklearn.metrics import mean_absolute_error,mean_squared_error,r2_score
r2 = r2_score(original,forecast)
print(r2)
mae = mean_absolute_error(original,forecast)
print(mae)
mse = mean_squared_error(original,forecast)
print(mse)
rmse = mean_squared_error(original,forecast,squared=False)
print(rmse)