In [23]:
!git clone https://github.com/jiaxiang-cheng/PyTorch-LSTM-for-RUL-Prediction

fatal: destination path 'PyTorch-LSTM-for-RUL-Prediction' already exists and is not an empty directory.


In [24]:
import os

# 打印当前工作目录
print("当前工作目录：", os.getcwd())


# 如果需要，更改工作目录
os.chdir('/content/PyTorch-LSTM-for-RUL-Prediction')


# 列出当前工作目录下的所有文件和文件夹
print(os.listdir('.'))



当前工作目录： /content/PyTorch-LSTM-for-RUL-Prediction
['requirements.txt', '.git', '.idea', '.DS_Store', 'PyTorch-LSTM-for-RUL-Prediction', 'CMAPSSData', '_trials', 'loading_data.pyc', 'LICENSE', '__pycache__', 'main.py', 'README.md', 'visualize.py', 'model.py', 'loading_data.py', '.gitignore']


In [14]:
"""Loading data sets"""
import pandas as pd

def add_rul_1(df):
    """
    :param df: 原始数据框架
    :return: 标记目标的数据框架
    """
    # 对每个单元获取总循环次数
    grouped_by_unit = df.groupby(by="unit_nr")
    max_cycle = grouped_by_unit["time_cycles"].max()
    print(max_cycle)
    # 将最大循环次数合并回原始框架
    result_frame = df.merge(max_cycle.to_frame(name='max_cycle'), left_on='unit_nr', right_index=True)
    # 计算每行的剩余使用寿命（线性片段）
    remaining_useful_life = result_frame["max_cycle"] - result_frame["time_cycles"]
    result_frame["RUL"] = remaining_useful_life
    # 删除不再需要的最大循环列
    result_frame = result_frame.drop("max_cycle", axis=1)
    return result_frame

def load_FD001(cut):
    """
    :param cut: 目标RUL的上限
    :return: 按样本分组的数据
    """
    # 定义文件路径
    dir_path = './CMAPSSData/'
    # 定义列名便于索引
    index_names = ['unit_nr', 'time_cycles']
    setting_names = ['setting_1', 'setting_2', 'setting_3']
    sensor_names = ['s_{}'.format(i) for i in range(1, 22)]
    col_names = index_names + setting_names + sensor_names
    # 读取数据
    train = pd.read_csv((dir_path + 'train_FD001.txt'), sep='\s+', header=None, names=col_names)
    test = pd.read_csv((dir_path + 'test_FD001.txt'), sep='\s+', header=None, names=col_names)
    y_test = pd.read_csv((dir_path + 'RUL_FD001.txt'), sep='\s+', header=None, names=['RUL'])
    # 基于EDA去除非信息特征
    drop_sensors = ['s_1', 's_5', 's_6', 's_10', 's_14', 's_16', 's_18', 's_19']
    drop_labels = setting_names + drop_sensors
    train.drop(labels=drop_labels, axis=1, inplace=True)

    # 选择训练数据中的索引列（单元号和循环次数）
    title = train.iloc[:, 0:2]
    # 选择训练数据中的数据列
    data = train.iloc[:, 2:]

    # 对数据进行最小-最大规范化
    data_norm = (data - data.min()) / (data.max() - data.min())  # 最小-最大规范化
    train_norm = pd.concat([title, data_norm], axis=1)

    # 添加剩余使用寿命(RUL)列
    train_norm = add_rul_1(train_norm)

    # 按单元号分组
    group = train_norm.groupby(by="unit_nr")

    # 对测试数据重复上述删除特征和规范化的过程
    test.drop(labels=drop_labels, axis=1, inplace=True)
    title = test.iloc[:, 0:2]
    data = test.iloc[:, 2:]
    data_norm = (data - data.min()) / (data.max() - data.min())
    test_norm = pd.concat([title, data_norm], axis=1)

    # 按单元号对测试数据分组
    group_test = test_norm.groupby(by="unit_nr")

    # 返回处理后的训练数据组、测试数据组和测试数据的真实RUL值
    return group, group_test, y_test


In [15]:
import torch
import torch.nn as nn
from torch.autograd import Variable

class LSTM1(nn.Module):
    """LSTM architecture"""
    # 初始化函数定义模型的基本结构
    def __init__(self, input_size, hidden_size, num_layers, seq_length=1):
        super(LSTM1, self).__init__()
        self.input_size = input_size  # 输入层大小
        self.hidden_size = hidden_size  # 隐藏层大小
        self.num_layers = num_layers  # LSTM层数
        self.seq_length = seq_length  # 序列长度

        # 定义LSTM层
        self.lstm = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=num_layers, batch_first=True,
                            dropout=0.1)
        # 定义全连接层
        self.fc_1 = nn.Linear(hidden_size, 16)  # 第一全连接层，16个节点
        self.fc_2 = nn.Linear(16, 8)  # 第二全连接层，8个节点
        self.fc = nn.Linear(8, 1)  # 输出层

        # 定义Dropout和ReLU激活函数
        self.dropout = nn.Dropout(0.1)
        self.relu = nn.ReLU()

    # 前向传播函数
    def forward(self, x):
        """
        :param x: 输入特征
        :return: 预测结果
        """
        # 初始化隐藏状态和内部状态
        h_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))
        c_0 = Variable(torch.zeros(self.num_layers, x.size(0), self.hidden_size))

        # LSTM层输出
        output, (hn, cn) = self.lstm(x, (h_0, c_0))

        # 选择最后一层的隐藏状态
        hn_o = torch.Tensor(hn.detach().numpy()[-1, :, :])
        hn_o = hn_o.view(-1, self.hidden_size)
        hn_1 = torch.Tensor(hn.detach().numpy()[1, :, :])
        hn_1 = hn_1.view(-1, self.hidden_size)

        # 应用全连接层和激活函数
        out = self.relu(self.fc_1(self.relu(hn_o + hn_1)))
        out = self.relu(self.fc_2(out))
        out = self.dropout(out)
        out = self.fc(out)
        return out


In [22]:
# 函数 testing_function
# 定义了一个用于测试模型的函数，接收参数为测试集数量和测试数据分组。
# 在一个循环中，对每个测试样本进行预测，并计算预测值和真实值之间的均方根误差（RMSE）。
# 使用torch.Tensor将测试数据转换为张量，并通过模型进行预测。
# 计算每个预测值与实际值的误差，累加后计算RMSE。
# 返回所有测试样本的预测结果和计算出的RMSE。
# 函数 train
# 接收初始化后的模型、训练集样本数量和训练数据分组作为参数。
# 定义了一个循环，用于执行多个训练周期（epochs）。
# 在每个epoch内部，使用train()方法设置模型到训练模式。
# 遍历所有训练样本，将数据转换为张量，执行前向传播，计算损失，进行反向传播，并更新模型权重。
# 每个epoch结束时，调用测试函数评估模型性能，打印损失和RMSE，如果模型在测试集上的性能没有改进，则提前终止训练。
# 返回训练过程中得到的最佳结果和对应的RMSE。




"""RUL Prediction with LSTM"""
# from loading_data import *
# from model import *
# from visualize import *
import numpy as np

N_HIDDEN = 96  # NUMBER OF HIDDEN STATES
N_LAYER = 4  # NUMBER OF LSTM LAYERS
N_EPOCH = 150  # NUM OF EPOCHS
MAX = 135  # UPPER BOUND OF RUL
LR = 0.01  # LEARNING RATE


def testing_function(num, group_for_test):
    rmse_test, result_test = 0, list()  # 初始化RMSE为0，result_test为空列表，用于存储测试结果。

    for ite in range(1, num + 1):  # 遍历测试数据集中的每一个样本。
        X_test = group_for_test.get_group(ite).iloc[:, 2:]  # 从测试数据中获取特征数据。
        X_test_tensors = Variable(torch.Tensor(X_test.to_numpy()))  # 将测试数据转换为PyTorch张量。
        X_test_tensors = torch.reshape(X_test_tensors, (X_test_tensors.shape[0], 1, X_test_tensors.shape[1]))  # 调整张量形状以适应模型输入。

        test_predict = model.forward(X_test_tensors)  # 使用模型进行前向传播得到预测结果。
        data_predict = max(test_predict[-1].detach().numpy(), 0)  # 获取最后一次的预测值，并确保其不小于0。
        result_test.append(data_predict)  # 将预测结果添加到结果列表中。
        rmse_test = np.add(np.power((data_predict - y_test.to_numpy()[ite - 1]), 2), rmse_test)  # 计算RMSE。

    rmse_test = (np.sqrt(rmse_test / num)).item()  # 计算最终的RMSE值。
    return result_test, rmse_test  # 返回所有测试样本的预测结果和RMSE。

def train(model_for_train, ntrain, group_for_train):
    rmse_temp = 100  # 初始化临时RMSE为100，作为性能比较的基准。

    for epoch in range(1, N_EPOCH + 1):  # 进行多个训练周期的迭代。

        model_for_train.train()  # 将模型设置为训练模式。
        epoch_loss = 0  # 初始化该周期的损失总和为0。

        for i in range(1, ntrain + 1):  # 遍历训练集中的每一个样本。
            X, y = group_for_train.get_group(i).iloc[:, 2:-1], group_for_train.get_group(i).iloc[:, -1:]  # 提取特征和目标变量。
            X_train_tensors = Variable(torch.Tensor(X.to_numpy()))  # 将特征数据转换为张量。
            y_train_tensors = Variable(torch.Tensor(y.to_numpy()))  # 将目标数据转换为张量。
            X_train_tensors = torch.reshape(X_train_tensors, (X_train_tensors.shape[0], 1, X_train_tensors.shape[1]))  # 调整张量形状。

            outputs = model_for_train(X_train_tensors)  # 进行前向传播得到输出。
            optimizer.zero_grad()  # 清除之前的梯度信息。
            loss = criterion(outputs, y_train_tensors)  # 计算损失。
            epoch_loss += loss.item()  # 累加损失。
            loss.backward()  # 进行反向传播计算梯度。
            optimizer.step()  # 根据梯度更新模型参数。

        if epoch % 1 == 0:  # 每个周期结束后评估模型性能。
            model_for_train.eval()  # 设置模型为评估模式。
            result, rmse = testing_function(num_test, group_test)  # 调用测试函数得到性能指标。

            if rmse_temp < rmse and rmse_temp < 25:  # 如果新的RMSE没有改善且低于25，则停止训练。
                result, rmse = result_temp, rmse_temp
                break

            rmse_temp, result_temp = rmse, result  # 更新最佳RMSE和结果。
            print("Epoch: %d, loss: %1.5f, rmse: %1.5f" % (epoch, epoch_loss / ntrain, rmse))  #打印每个周期的损失和RMSE。

    return result, rmse  # 返回训练过程中的最佳结果和对应的RMSE。


SyntaxError: invalid character '，' (U+FF0C) (<ipython-input-22-18d83b22cde7>, line 2)

In [20]:
import pandas as pd
import matplotlib.pyplot as plt

def visualize(result, y_test, num_test, rmse):
    """
    可视化预测结果函数
    :param result: 预测结果列表
    :param y_test: 测试集的真实RUL值
    :param num_test: 样本数量
    :param rmse: 预测结果的均方根误差
    """
    result = y_test.join(pd.DataFrame(result))  # 将预测结果与真实RUL值合并为一个DataFrame
    result = result.sort_values('RUL', ascending=False)  # 按照RUL值降序排序

    # 提取真实和预测的RUL值
    true_rul = result.iloc[:, 0].to_numpy()  # 真实的剩余使用寿命
    pred_rul = result.iloc[:, 1].to_numpy()  # 预测的剩余使用寿命

    plt.figure(figsize=(10, 6))  # 设置图形大小
    plt.axvline(x=num_test, c='r', linestyle='--')  # 绘制红色虚线标示训练集大小

    plt.plot(true_rul, label='Actual RUL')  # 绘制真实RUL曲线
    plt.plot(pred_rul, label='Predicted RUL (RMSE = {})'.format(round(rmse, 3)))  # 绘制预测RUL曲线，并显示RMSE
    plt.title('Remaining Useful Life Prediction')  # 图表标题
    plt.legend()  # 显示图例
    plt.xlabel("Samples")  # X轴标签
    plt.ylabel("Remaining Useful Life")  # Y轴标签
    plt.savefig('./_trials/{} RUL Prediction with LSTM.png'.format(round(rmse, 3)))  # 保存图形到文件
    plt.show()  # 显示图形


In [18]:
if __name__ == "__main__":
    # 从数据集中获取基本信息
    group, group_test, y_test = load_FD001(MAX)  # 加载数据集并进行预处理，返回分组数据和测试数据的真实RUL值
    num_train, num_test = len(group.size()), len(group_test.size())  # 计算训练集和测试集中的样本数
    input_size = group.get_group(1).shape[1] - 3  # 计算模型输入特征的数量

    # 初始化LSTM模型
    model = LSTM1(input_size, N_HIDDEN, N_LAYER)  # 实例化LSTM模型，指定输入大小、隐藏层大小和层数
    criterion = torch.nn.MSELoss()  # 定义损失函数为均方误差
    optimizer = torch.optim.Adam(model.parameters(), lr=LR)  # 定义优化器为Adam，设置学习率

    # 训练并评估模型
    result, rmse = train(model, num_train, group)  # 调用训练函数，训练模型并获取测试结果和RMSE
    visualize(result, y_test, num_test, rmse)  # 调用可视化函数，显示预测结果和真实值的对比图


unit_nr
1      192
2      287
3      179
4      189
5      269
      ... 
96     336
97     202
98     156
99     185
100    200
Name: time_cycles, Length: 100, dtype: int64
Epoch: 1, loss: 10740.83138, rmse: 52.05190
Epoch: 2, loss: 4566.35310, rmse: 53.26039


KeyboardInterrupt: 