# 1.数据预处理

In [None]:
#假设已从LS-DYNA仿真中获得了数据并存为了CSV文件
import torch
import numpy as np
import pandas as pd
from sklearn.preprocessing import MinMaxScaler
from torch.utils.data import Dataset,DataLoader

#导入应变 FRP弹模 混凝土抗压强度 FRP厚度 FRP极限强度的数据
data = pd.read_csv("文件路径")

#定义时序参数
TIME_STEPS = 30  # 输入时间步长
PRED_STEPS = 3   # 预测步长

#选择需要的特征 假设是轴向应变 
features = data[['strain', 'elastic_modulus_FRP','compressive_strength_concrete', 'FRP_thickness', 'ultimate_strength_FRP' ]].values

#构建时间窗口数据集
def create_sequences(data, time_steps, pred_steps):
    X, y = [], []
    for i in range(len(data) - time_steps - pred_steps + 1):
        # 输入：连续30个时间步的所有特征
        X.append(data[i:i+time_steps, :])  
        # 输出：后续3个时间步的应变值（假设strain是第一列）
        y.append(data[i+time_steps:i+time_steps+pred_steps, 0])  
    return np.array(X), np.array(y)

X, y = create_sequences(features, TIME_STEPS, PRED_STEPS)

#数据标准化（归一化）
scaler_x = MinMaxScaler()
scaler_y = MinMaxScaler()

#将三维数据重塑为二维进行标准化
original_shape = X.shape
X = scaler_x.fit_transform(X.reshape(-1, X.shape[-1])).reshape(original_shape)
y = scaler_y.fit_transform(y.reshape(-1, 1)).reshape(y.shape[0], -1)

#划分训练集和测试集
train_size = int(0.8 * len(X))
X_train, X_test = X[:train_size], X[train_size:]
y_train, y_test = y[:train_size], y[train_size:]

#转换为torch张量
#自定义数据集类
class StrainDataset(Dataset):
    def __init__(self, features, labels):
        self.features = torch.tensor(features,dtype = torch.float32)
        self.labels = torch.tensor(labels, dtype = torch.float32)
        
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        
        return self.features[idx], self.labels[idx]
    
#构建数据加载器
train_dataset = StrainDataset(X_train, y_train)
test_dataset = StrainDataset(X_test, y_test)
#假设batch_size均为128
batch_size = 128
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
        


# 2.构建模型

## 2.1 LSTM网络

### 2.1.1 LSTM网络的搭建

In [None]:
import torch.nn as nn

#原文给出的是LSTM模型是具有128个神经元的单层结构  有两个具有64个和3个神经元的Dense层
#但由于文中给出的是GRU的原理图 因此按照原文进行建模
class StrainPredictionLSTM(nn.Module):
    def __init__(self,input_size,hidden_size,output_size,num_layers):
        super( StrainPredictionLSTM,self).__init__()
        
        #LSTM层 
        self.lstm = nn.LSTM(input_size,hidden_size,num_layers,batch_first=True)
        
        #全连接层，连接LSTM输出到最终预测 假设激活函数用的是RELU
        self.fc = nn.Sequential(
            nn.Linear(hidden_size,64),
            nn.ReLU(),
            nn.Linear(64,output_size)
        )
        
    def forward(self, x):
        #LSTM层的前向传播 假设没有定义要在GPU上跑
        lstm_out,_ = self.lstm(x) # lstm_out包含所有时间步的输出
        
        #取最后一个时间步的输出作为预测
        last_step = lstm_out[:,-1,:]
        
        return self.fc(last_step) 

#假设输入数据的维度位：batch_size×seq_len×input_size
#输入维度位30个时间步 特征维度为1
input_size = 5  # 输入特征的维度
hidden_size = 128  # LSTM隐藏层的维度
output_size = 3 # 输出维度
num_layers = 1  # LSTM层数
seq_len = 30  # 序列长度

#创建模型
model_LSTM = StrainPredictionLSTM(input_size = input_size, hidden_size = hidden_size, output_size = output_size, num_layers = num_layers)
    
#打印模型
print(model_LSTM)

### 2.1.1 LSTM网络的训练和评估

In [None]:
import torch.optim as optim
from sklearn.metrics import mean_squared_error
from torchmetrics import MeanAbsolutePercentageError

#定义损失函数和优化器
criterion_LSTM_rmse = nn.MSELoss()
criterion_LSTM_mape = MeanAbsolutePercentageError()


#训练过程
def train(model_LSTM, train_loader,test_loader, optimizer, num_epochs=400):#假设均进行400epoch
    
    train_rmse, test_rmse = [], []
    for epoch in range(num_epochs):
        #训练阶段
        model_LSTM.train()
        running_lstm_rmse_loss = 0.0
        #running_lstm_mape_loss = 0.0
        for inputs,targets in train_loader:
            optimizer_LSTM.zero_grad()
            #前向传播
            outputs = model_LSTM(inputs)
            #计算损失 以计算rmse为例
            lstm_rmse_loss = criterion_LSTM_rmse(outputs, targets)
            #lstm_mape_loss = criterion_LSTM_mape(outputs, targets)
            #反向传播
            lstm_rmse_loss.backward()
           # lstm_mape_loss.backward()
            #更新参数
            optimizer.step()
            running_lstm_rmse_loss += lstm_rmse_loss.item()
            #running_lstm_mape_loss += lstm_mape_loss.item()
            
        #评估阶段
        model.eval()
        with torch.no_grad():
            #训练集评估
            train_preds = torch.cat([model(batch[0]) for batch in train_loader])
            train_rmse.append(torch.sqrt(criterion(train_preds, train_loader.dataset.labels)).item())
            
            #测试集评估
            test_preds = torch.cat([model(batch[0]) for batch in test_loader])
            test_rmse.append(torch.sqrt(criterion(test_preds, test_loader.dataset.labels)).item())
            
       print(f'Epoch {epoch+1}/{num_epochs}, '
              f'Train RMSE: {train_rmse:.4f}, '
              f'Test RMSE: {test_rmse:.4f}')
    
    return train_rmse, test_rmse  # 返回历史记录     

        
# 模型初始化（注意输入维度修正）
model_LSTM = StrainPredictionLSTM(input_size=5)  # 关键参数修正

# 优化器设置
optimizer = optim.Adam(model_LSTM.parameters(), lr=1e-4)  

# 启动训练
train_rmse, test_rmse = train(
    model=model_LSTM,
    train_loader=train_loader,
    test_loader=test_loader,
    optimizer=optimizer
)

#计算最终的MAPE  #可以将损失函数改为MAPE再算一次  因为没办法同时返回两个损失函数
#但是可以把他们组合一下 即每个损失函数乘以一定的权重相加
with torch.no_grad():
    y_true = test_loader.dataset.labels
    y_pred = model_LSTM(test_loader.dataset.features)
    mape = criterion_LSTM_mape(y_pred, y_true).item() * 100 
    
print(f'Final Test RMSE: {test_rmse[-1]:.4f}')
print(f'Final Test MAPE: {mape:.2f}%')

#还需要绘制RMSE MAPE strain 的曲线  


## 2.2 GRU网络

### 2.2.1 GRU网络的搭建

In [None]:
#这里重新引入部分库
import torch
import torch.nn as nn
import torch.optim as optim

class StrainPredictionGRU(nn.Module):
    def __init__(self, input_size, hidden_size1, hidden_size2, output_size, num_layers=2):
        super(StrainPredictionGRU, self).__init__()
        
        #定义GRU层
        self.gru = nn.GRU(input_size, hidden_size1, num_layers, batch_first=True)
        
        #定义第二层GRU
        self.gru2 = nn.GRU(hidden_size1, hidden_size2, num_layers, batch_first=True)
        
        #定义全连接层
        self.fc = nn.Sequential(
            nn.Linear(hidden_size2, 64),
            nn.ReLU(),
            nn.Linear(64, 32)
            nn.ReLU(),
            nn.Linear(32, output_size)
        )
        
        def forward(self,x):
            #第一个GRU层的输出
            gru_out,_ = self.gru(x)
            
            #第二个GRU层的输出
            gru_out2,_ = self.gru2(gru_out)
            
            #使用最后一个时间步的隐藏状态去预测
            last_hidden_state = gru_out2[:,-1,:]
            
            #通过全连接层进行处理
            x = self.fc1(last_hidden_state)
            #x = self.dropout(x)  # Dropout层
            x = self.fc2(x)
            x = self.fc3(x)  # 最终输出层，3个预测值
            
            return x 
        
#模型参数
input_size = 5  # 每个时间步的特征数
hidden_size1 = 256  # 第一个GRU层的隐藏层大小
hidden_size2 = 128  # 第二个GRU层的隐藏层大小
output_size = 3  # 预测未来3个时间步的应变值
num_layers = 2  # GRU层数
seq_len = 30  # 输入序列长度（30个时间步）

#创建模型
model_GRU = StrainPredictionGRU(input_size=input_size, hidden_size1=hidden_size1, hidden_size2=hidden_size2, output_size=output_size, num_layers=num_layers)

#打印模型结构
print(model_GRU)

### 2.2.2 GRU网络的训练与评估

In [None]:
from torch.utils.data import Dataset,DataLoader
import numpy as np

#之前已经定义过了数据集 这里直接定义损失函数rmse和优化器
criterion_GRU_rmse = np.sqrt(mean_squared_error(all_labels, all_preds))
optimizer_GRU = optim.Adam(model.parameters(), lr=0.001)

#训练过程
def train(model_GRU, train_loader, test_loader, num_epochs=400):
    train_rmse_list, test_rmse_list = [], []
    for epoch in range(num_epochs):
        #训练阶段：
        model_GRU.train()
        running_loss = 0.0
        
        for inputs, targets in train_loader:
            optimizer.zero_grad()
            outputs = model_GRU(inputs)
            loss = criterion_GRU_rmse(outputs,targets)
            loss.backward()
            optimizer_GRU.step()
            running_loss += loss.item()
            
        #评估阶段
        model.eval()
        with torch.no_grad():
            # 训练集评估
            train_preds = torch.cat([model(batch[0]) for batch in train_loader])
            train_rmse = torch.sqrt(criterion(train_preds, train_loader.dataset.labels)).item()
            
            # 测试集评估
            test_preds = torch.cat([model(batch[0]) for batch in test_loader])
            test_rmse = torch.sqrt(criterion(test_preds, test_loader.dataset.labels)).item()
        
        train_rmse_list.append(train_rmse)
        test_rmse_list.append(test_rmse)
        print(f'Epoch {epoch+1}/{num_epochs} | '
              f'Train RMSE: {train_rmse:.4f} | '
              f'Test RMSE: {test_rmse:.4f}')
            
       
    return train_rmse_list, test_rmse_list
        

# 模型初始化
model_GRU = StrainPredictionGRU(input_size=5, hidden_size1=256, hidden_size2 = 128)

# 优化器设置（按论文参数）
optimizer_GRU = optim.Adam(model_GRU.parameters(), lr=1e-4)

# 启动训练
history = train(
    model=model_GRU,
    train_loader=train_loader,
    test_loader=test_loader,
    optimizer=optimizer
)

# 最终评估
def final_evaluate(model, loader):
    model.eval()
    with torch.no_grad():
        y_true = loader.dataset.labels
        y_pred = model(loader.dataset.features)
        rmse = torch.sqrt(nn.MSELoss()(y_pred, y_true)).item()
        mape = (torch.abs((y_pred - y_true)/y_true).mean().item() * 100
    return rmse, mape

train_rmse, train_mape = final_evaluate(model_GRU, train_loader)
test_rmse, test_mape = final_evaluate(model_GRU, test_loader)

print(f'[Final] Train RMSE: {train_rmse:.4f}, MAPE: {train_mape:.2f}%')
print(f'[Final] Test RMSE: {test_rmse:.4f}, MAPE: {test_mape:.2f}%')

## 2.3 搭建Basic GAN

### 2.3.1 搭建生成器GRU

In [None]:
import torch.nn as nn
import torch.nn.functional as F

class Generator(nn.Module):
    def __init__(self, input_features=5, output_features=1, seq_len=30):
        super(Generator, self).__init__()
        self.gru1 = nn.GRU(input_features, 1024, batch_first = True)
        self.gru2 = nn.GRU(1024, 512, batch_first = True)
        self.gru3 = nn.GRU(512, 256, batch_first = True)
        self.fc1 = nn.Linear(256,128)
        self.fc2 = nn.Linear(128, 64)
        self.fc3 = nn.Linear(64, output_features)
        
    def forward(self, z):
        out,_ = self.gru1(z)
        out1,_ = self.gru2(out)
        out2,_ = self.gru3(out1)
    
    #全连接层 使用最后一个时间步的隐藏状态进行预测
        out3 = torch.relu(self.fc1(out2[:,-1,:]))
        out4 = self.fc2(out3)
        out5 = self.fc3(out4)
        
        out5 = out5.unsqueeze(1).repeat(1, 3, 1)
    
        return out5



### 2.3.2 搭建判别器CNN

In [None]:
#判别器通常倾向于判断整个序列的真假 而不是序列中某个值的真假
class Discriminator(nn.Module):
    def __init__(self, input_channels):
        super(Discriminator, self).__init__()
        #conv1d用于处理一维数据（时间序列 文本数据 音频）
        self.conv1 = nn.Conv1d(input_channels=1,32,3)
        self.conv2 = nn.Conv1d(32,64,3)
        self.conv3 = nn.Conv1d(64, 128, 3)
        self.pool = nn.AdaptiveAvgPool1d(1)
        self.fc1 = nn.Linear(128, 220)
        self.fc2 = nn.Linear(220,1)
        
    def forward(self,x):
        # 将 (batch_size, seq_len, input_channels) 转换为 (batch_size, input_channels, seq_len)
        x = x.permute(0, 2, 1)  # 交换维度，以适应Conv1d输入
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        # 自适应池化至固定大小，并移除最后一个维度
        x = self.pool(x).squeeze(-1)
        #全连接层
        x = torch.LeakyReLU(self.fc1(x),negative_slope=0.01)
        #输出一个值，表示整个序列的真假
        x = torch.sigmoid(self.fc2(x))
        
        return x
    


### 2.3.3 GAN的搭建与训练

In [None]:
# 损失函数：RMSE
def criterion_GAN_rmse(real_data, fake_data):
    mse_loss = nn.MSELoss()
    return torch.sqrt(mse_loss(real_data, fake_data))

class GAN(nn.Module):
    def __init__(self, generator, discriminator):
        super(GAN, self).__init__()
        self.genarator = generator
        self.discriminator = discriminator
      #因为是时间序列问题，因此不需要随机噪声  
    def forward(self, real_data):
        fake_data = self.generator(real_data)
        return fake_data
    

In [None]:
#创建生成器和判别器
generator = Generator(input_features=5)
discriminator = Discriminator(input_channels=1)

# 优化器
optimizer_G = optim.Adam(generator.parameters(), lr=1e-4)
optimizer_D = optim.Adam(discriminator.parameters(), lr=1e-4)

#计算RMSE和MAPE
def evaluate_model_GAN(y_true, y_pred):
    rmse = np.sqrt(mean_squared_error(y_true, y_pred))  # 计算RMSE
    mape = np.mean(np.abs((y_true - y_pred) / y_true)) * 100  # 计算MAPE
    return rmse, mape

#训练函数
def train_GAN(generator, discriminator, epochs, batch_size=128,seq_len=30):
    for epoch in range(epochs):
        # 训练判别器
        for _ in range(5):  # 判别器更新次数
        
            z =  X_train_tensor[:batch_size, :seq_len, :]   # 获取真实数据
            fake_data = generator(z)  # 通过生成器生成假数据

            # 获取真实数据，时间步 31 到 33
            real_data = X_train_tensor[batch_size, seq_len:seq_len+3, :]  # 真实数据，时间步 31 到 33
            
            # 判别器优化
            optimizer_D.zero_grad()
            real_labels = torch.ones(batch_size, 1)  # 真实标签
            fake_labels = torch.zeros(batch_size, 1)  # 假标签
            
            real_loss = criterion_GAN_rmse(real_data, real_labels)
            fake_loss = criterion_GAN_rmse(fake_data, fake_labels)
            d_loss = (real_loss + fake_loss) / 2

            d_loss.backward()
            optimizer_D.step()
            
            
         # 训练生成器
        z =  X_train_tensor[:batch_size, :seq_len, :] 
        fake_data = generator(z)

        # 生成器优化
        optimizer_G.zero_grad()
        g_loss = -torch.mean(discriminator(fake_data))
        g_loss.backward()
        optimizer_G.step()

        print(f"Epoch {epoch}/{epochs} [D loss: {d_loss.item()}] [G loss: {g_loss.item()}]")
        
         # 在每个epoch结束后，评估模型性能（例如在训练集上计算RMSE和MAPE）
        if epoch % 10 == 0:  # 每10个epoch进行一次评估
            y_pred = fake_data.detach().numpy()  # 获取生成的假数据并转化为numpy数组
            y_true = y_train_tensor[batch_size, seq_len:seq_len+3, :].detach().numpy()  # 获取真实数据（31-33时间步的数据）
            
            # 计算 RMSE 和 MAPE
            rmse, mape = evaluate_model_GAN(y_true, y_pred)
            print(f"Epoch {epoch} [RMSE: {rmse}] [MAPE: {mape}]")

# 训练模型
train_GAN(generator, discriminator, epochs=400, batch_size=128)


# 在测试集上评估模型
def evaluate_on_test(generator, X_test_tensor, y_test_tensor):
    # 使用生成器生成预测数据
    y_pred = generator(X_test_tensor)  # 生成器的输出（假数据）

    # 计算评估指标
    rmse, mape = evaluate_model_GAN(y_test_tensor.numpy(), y_pred.detach().numpy())  # 注意转换为numpy数组
    print(f"RMSE: {rmse}, MAPE: {mape}")




## 2.4 StrainNet  即WGAN-GP

In [None]:
#CNN和GAN中的discriminator相同  generator不同  因此重新设置GRU网络
class StrainGenerator(nn.Module):
    def __init__(self, input_features=5, output_features=1, seq_len=30):
        super(Generator, self).__init__()
        self.gru1 = nn.GRU(input_features, 256, batch_first=True)
        self.gru2 = nn.GRU(256, 128, batch_first=True)
        self.fc1 = nn.Linear(128, 64)
        self.fc2 = nn.Linear(64, 32)  
        self.fc3 = nn.Linear(32, output_features)

    def forward(self, z):
        out, _ = self.gru1(z)
        out, _ = self.gru2(out)
        out = torch.relu(self.fc1(out[:, -1, :]))  # 取最后一个时间步的输出
        out = self.fc2(out)
        out = self.fc3(out)
        out =out.unsqueeze(1).repeat(1,3, 1)
        return out

In [None]:
class StrainNetTrainer:
    def __init__(self, latent_dim=5, gp_weight=10):
        self.generator = StrainGenerator(latent_dim)
        self.discriminator = Discriminator()
        self.gp_weight = gp_weight
        
        # 使用RMSprop优化器
        self.opt_g = optim.RMSprop(self.generator.parameters(), lr=1e-4)
        self.opt_d = optim.RMSprop(self.discriminator.parameters(), lr=1e-4)
        
    def gradient_penalty(self, real, fake):
        batch_size = real.size(0)
         # 生成一个 alpha 向量，alpha 是一个在 [0, 1] 区间均匀分布的随机数
        # alpha 用来生成真实数据和假数据之间的插值
        alpha = torch.rand(batch_size, 1, 1)
        # 计算生成的插值数据：线性插值
        interpolates = (alpha * real + (1 - alpha) * fake).requires_grad_(True)
        # 通过判别器对插值数据进行评分
        d_interpolates = self.discriminator(interpolates)
        
        # 计算梯度：我们通过自动微分计算判别器对插值数据的梯度
        gradients = torch.autograd.grad(
            outputs=d_interpolates,# 判别器的输出
            inputs=interpolates,# 插值数据
            grad_outputs=torch.ones_like(d_interpolates),# 梯度的初始值，通常为 1，因为我们计算的是一阶导数
            create_graph=True,# 为了计算高阶导数，我们需要保留图
            retain_graph=True # 保留计算图用于后续的反向传播
        )[0]
        
        # 展平梯度矩阵，使其变成一个一维的向量，便于计算范数
        gradients = gradients.view(batch_size, -1)
        # 计算梯度的范数（L2范数），表示梯度的大小
        grad_norm = gradients.norm(2, dim=1)
        
        # 返回梯度惩罚项：计算梯度范数与1的差的平方的均值
        return torch.mean((grad_norm - 1) ** 2)
    
    def train_step(self, real_data):
        # 训练判别器
        for _ in range(5):  # 判别器迭代次数
            # 生成假数据
            z =  X_train_tensor[:batch_size, :seq_len, :]
            fake_data = self.generator(z)
            
            # 计算Wasserstein距离
            real_loss = -torch.mean(self.discriminator(real_data))
            fake_loss = torch.mean(self.discriminator(fake_data.detach()))
            gp = self.gradient_penalty(real_data, fake_data)
            d_loss = real_loss + fake_loss + self.gp_weight * gp
            
            self.opt_d.zero_grad()
            d_loss.backward()
            self.opt_d.step()
            
    
        # 训练生成器
        fake_data = self.generator(z)
        g_loss = -torch.mean(self.discriminator(fake_data))
        
        self.opt_g.zero_grad()
        g_loss.backward()
        self.opt_g.step()
        
        return {'d_loss': d_loss.item(), 'g_loss': g_loss.item()}

In [None]:
# 数据加载器示例
class StrainDataset(Dataset):
    def __init__(self, data_tensor):
        # 输入形状应为 (num_samples, 30, 5)
        self.data = data_tensor
        
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx]

# 初始化数据加载器
train_dataset = StrainDataset(X_train_tensor)  # X_train_tensor.shape: (N,30,5)
train_loader = DataLoader(train_dataset, batch_size=128, shuffle=True)

In [None]:
def evaluate_strainnet(generator, train_loader, batch_size=128):
    # 生成样本
    z =  X_train_tensor[:batch_size, :seq_len, :]
    gen_data = generator(z)  # (batch_size, 30, 5)
    
    # 获取真实样本
    real_data = X_train_tensor[batch_size, seq_len:seq_len+3, :]
    
    # 计算特征统计相似度
    metrics_train = {
        '特征RMSE': torch.sqrt(((gen_data - real_data[:n_samples])**2).mean()),
        '应变分布KL散度': calculate_kl_divergence(
            gen_data[:, :, 0].flatten(),  # 应变特征
            real_data[:, :, 0].flatten()
        )
    }
    return metrics_train

# 在测试集上评估模型
def evaluate_on_test_strainnet(generator, X_test_tensor):
    z =  X_test_tensor[:batch_size, :seq_len, :]
    gen_data = generator(z)  # (batch_size, 30, 5)
    
    # 获取真实样本
    real_data = X_test_tensor[batch_size, seq_len:seq_len+3, :]
    
    # 计算特征统计相似度
    metrics_train = {
        '特征RMSE': torch.sqrt(((gen_data - real_data[:n_samples])**2).mean()),
        '应变分布KL散度': calculate_kl_divergence(
            gen_data[:, :, 0].flatten(),  # 应变特征
            real_data[:, :, 0].flatten()
        )
    }
    return metrics