In [None]:
!pip install lightning
!pip install fastexcel

In [None]:
import numpy as np
import pandas as pd
import polars as pl
import matplotlib.pyplot as plt
from sklearn.preprocessing import StandardScaler

df = pl.read_excel('/kaggle/input/jk-pinn-2404-3/().xlsx')
df

In [None]:
X = df.select(pl.col('应变幅度'), pl.col('保持时间'), pl.col('应变率'), pl.col('温度T'), pl.col('C'), pl.col('Ni'), pl.col('Co'), pl.col('Cr'), pl.col('Mo'), pl.col('Al'), pl.col('Ti')).to_numpy()
y = df.select(pl.col('循环次数')).to_numpy()

In [None]:
#from tsaug import TimeWarp, Crop, Quantize, Drift, Reverse, AddNoise
#augmenter = (
#    AddNoise(scale=0.03)
#)
#
#Xy = np.hstack([X, y])
#Xy = np.vstack([Xy ,augmenter.augment(Xy)])
#Xy = np.vstack([Xy ,augmenter.augment(Xy)])
#Xy = np.vstack([Xy ,augmenter.augment(Xy)])
#Xy = np.vstack([Xy ,augmenter.augment(Xy[:32, :])])

In [None]:
import os
import torch
from torch import optim, nn, utils, Tensor
from torchvision.datasets import MNIST
from torchvision.transforms import ToTensor
import lightning as L

import torch.nn.functional as F
from torchvision import transforms
from torchvision.datasets import MNIST
from torch.utils.data import Dataset, DataLoader
from lightning.pytorch.callbacks.early_stopping import EarlyStopping

from sklearn.metrics import mean_squared_error, mean_absolute_percentage_error
# define any number of nn.Modules (or use your current ones)


# define the LightningModule
class LitMlpModel(L.LightningModule):
    def __init__(self, lamb):
        super().__init__()
        self.fc1 = nn.Sequential(nn.Linear(11, 36), nn.GELU(), nn.BatchNorm1d(36, 36), nn.Dropout(0.1))
        self.fc2 = nn.Sequential(nn.Linear(36, 36), nn.GELU(), nn.BatchNorm1d(36, 36), nn.Dropout(0.1))
        self.fc3 = nn.Sequential(nn.Linear(36, 1))
        self.relu = nn.ReLU()
        self.train_losses = []
        self.valid_losses = []
        self.train_losses_epoch = []
        self.valid_losses_epoch = []
        self.phy_losses = []
        self.phy_losses_epoch = []
        self.partial_deri_1 = []
        self.partial_deri_2 = []
        self.partial_deri_3 = []
        self.partial_deri_1_final = []
        self.partial_deri_2_final = []
        self.partial_deri_3_final = []
        self.lamb = lamb
        
    def forward(self, batch):
        x, y= batch
        return self.fc3(self.fc2(self.fc2(self.fc2(self.fc1(x)))))
        
    def training_step(self, batch, batch_idx):
        # training_step defines the train loop.
        # it is independent of forward
        x, y = batch
        #y = torch.log10(y)
        y_hat = self.fc3(self.fc2(self.fc2(self.fc2(self.fc2(self.fc1(x))))))

        y_hat_x1 = torch.autograd.grad(y_hat, x, grad_outputs=torch.ones_like(y_hat), create_graph=True)[0][:, [0]]
        y_hat_x2 = torch.autograd.grad(y_hat, x, grad_outputs=torch.ones_like(y_hat), create_graph=True)[0][:, [1]]
        y_hat_x2x2 = -torch.autograd.grad(y_hat_x2, x, grad_outputs=torch.ones_like(y_hat), create_graph=True)[0][:, [1]]
        
        #torch.sum(y_hat_x1) + torch.sum(y_hat_x2) + torch.sum(y_hat_x2x2)
        lamb = self.lamb
        phy_loss = (torch.sum(self.relu(y_hat_x1)) + torch.sum(self.relu(y_hat_x2)) + torch.sum(self.relu(y_hat_x2x2))) / len(y_hat_x1)
        loss = (1 - lamb) * nn.functional.mse_loss(y_hat, y) + lamb * (phy_loss)
        # Logging to TensorBoard (if installed) by default
        self.log("train_loss", loss, on_epoch=True, prog_bar=True)
        self.train_losses.append(loss.item())  # 保存 loss 数值
        self.phy_losses.append(phy_loss.item())
        self.partial_deri_1 += y_hat_x1
        self.partial_deri_2 += y_hat_x2
        self.partial_deri_3 += y_hat_x2x2
        
        return loss

    def on_train_epoch_end(self):
        train_loss_epoch = torch.mean(torch.tensor(self.train_losses, dtype=torch.float32))
        self.train_losses_epoch.append(train_loss_epoch)
        self.train_losses.clear()  # free memory

        phy_loss_epoch = torch.mean(torch.tensor(self.phy_losses, dtype=torch.float32))
        self.phy_losses_epoch.append(phy_loss_epoch)
        self.phy_losses.clear()  # free memory
        
        self.partial_deri_1_final = self.partial_deri_1.copy()
        self.partial_deri_1.clear()
        self.partial_deri_2_final = self.partial_deri_2.copy()
        self.partial_deri_2.clear()
        self.partial_deri_3_final = self.partial_deri_3.copy()
        self.partial_deri_3.clear()

    def validation_step(self, batch, batch_idx):
        # training_step defines the train loop.
        # it is independent of forward
        x, y = batch
        #y = torch.log10(y)
        y_hat = self.fc3(self.fc2(self.fc2(self.fc2(self.fc1(x)))))

        #y_hat_x1 = self.relu(torch.autograd.grad(y_hat, x, grad_outputs=torch.ones_like(y_hat), create_graph=True)[0][:, [0]])
        #y_hat_x2 = self.relu(torch.autograd.grad(y_hat, x, grad_outputs=torch.ones_like(y_hat), create_graph=True)[0][:, [1]])
        #y_hat_x2x2 = self.relu(-torch.autograd.grad(y_hat_x2, x, grad_outputs=torch.ones_like(y_hat), create_graph=True)[0][:, [1]])
        
        loss = nn.functional.mse_loss(y_hat, y) #+ (torch.sum(y_hat_x1) + torch.sum(y_hat_x2) + torch.sum(y_hat_x2x2)) / len(y_hat_x1)
        # Logging to TensorBoard (if installed) by default
        self.log("valid_loss", loss, on_epoch=True, prog_bar=True)
        self.valid_losses.append(loss.item())  # 保存 loss 数值
        return loss

    def on_validation_epoch_end(self):
        valid_loss_epoch = torch.mean(torch.tensor(self.valid_losses, dtype=torch.float32))
        self.valid_losses_epoch.append(valid_loss_epoch)
        self.valid_losses.clear()  # free memory

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=1e-3)
        return optimizer




In [None]:
# Step 1: 定义自定义 Dataset
class MyTabularDataset(Dataset):
    def __init__(self, X, y):
        self.X = X  # 特征
        self.y = y   # 标签

    def __len__(self):
        return len(self.X)

    def __getitem__(self, idx):
        # 返回张量格式
        X = torch.tensor(self.X[idx], dtype=torch.float32, requires_grad=True)
        y = torch.tensor(self.y[idx], dtype=torch.float32, requires_grad=True)  # 或 long 对于分类
        return X, y

# Step 2: 创建 Dataset 实例
from sklearn.model_selection import KFold
kf = KFold(n_splits=6, shuffle=True, random_state=42)



phy_losses_lamb = {}
y_pred_lamb = {}
partial_lamb = {}

for lamb in [0.0, 0.6]:
    y_pred = np.zeros_like(y)
    for i, (trn_idx, val_idx) in enumerate(kf.split(X)):
        scaler = StandardScaler().fit(X[trn_idx, :])
        X_std_trn = scaler.transform(X[trn_idx, :])
        X_std_val = scaler.transform(X[val_idx, :])
        
        train_loader = DataLoader(MyTabularDataset(X_std_trn, y[trn_idx]), batch_size=8, shuffle=True)
        valid_loader = DataLoader(MyTabularDataset(X_std_val, y[val_idx]), batch_size=8, shuffle=True)
        
        early_stop_callback = EarlyStopping(
            monitor="valid_loss",       # 监控的指标，通常是验证损失
            patience=50,               # 如果连续 3 次没有提升就停止
            mode="min",               # 因为是 loss，越小越好
            verbose=False              # 是否打印日志
        )
        # init the autoencoder
        
        mlpmodel = LitMlpModel(lamb)
        # train the model (hint: here are some helpful Trainer arguments for rapid idea iteration)
        trainer = L.Trainer(limit_train_batches=100000, max_epochs=500, callbacks=[early_stop_callback])
        trainer.fit(mlpmodel, train_loader, valid_loader)
        
        y_pred[val_idx] = mlpmodel((torch.tensor(X_std_val, dtype=torch.float32, requires_grad=True), torch.tensor(y[val_idx], dtype=torch.float32, requires_grad=True))).detach().numpy()
        
        plt.figure(figsize=(12, 8), dpi=100)
        plt.plot(mlpmodel.train_losses_epoch, label='Train_Losses')
        plt.plot(mlpmodel.valid_losses_epoch, label='Valid_Losses')
        plt.title(f"Training Loss Curve Fold {i+1} with Lambda {round(lamb,1)}")
        plt.xlabel("Epoch")
        plt.ylabel("Loss")
        plt.legend()
        plt.savefig(f"Loss_Curve_{i+1}_with_Lambda_{lamb}.png")
        plt.show()
        phy_losses_lamb[f'Fold_{i+1}_Lamb_{lamb}'] = mlpmodel.phy_losses_epoch

    partial_lamb[f'Lamb_{lamb}_1'] = mlpmodel.partial_deri_1_final
    partial_lamb[f'Lamb_{lamb}_2'] = mlpmodel.partial_deri_2_final
    partial_lamb[f'Lamb_{lamb}_3'] = mlpmodel.partial_deri_3_final
    y_pred_lamb[f'Lamb_{lamb}'] = y_pred
    
    #print(f'Lambda {lamb} MAPE Ncf: ', mean_absolute_percentage_error(y, 10**y_pred))
    #print(f'Lambda {lamb} MSE Ncf: ', mean_squared_error(y, 10**y_pred))
    #print(f'Lambda {lamb} MAPE lgNcf: ', mean_absolute_percentage_error(np.log10(y), y_pred))
    #print(f'Lambda {lamb} MSE lgNcf: ', mean_squared_error(np.log10(y), y_pred))
    
    print(f'Lambda {lamb} MAPE Ncf: ', mean_absolute_percentage_error(y, y_pred))
    print(f'Lambda {lamb} MSE Ncf: ', mean_squared_error(y, y_pred))

In [None]:
torch.tensor(partial_lamb[f'Lamb_{0.0}_{1}'])

In [None]:
x1_dnn = torch.tensor(partial_lamb[f'Lamb_{0.0}_{1}']).detach().numpy()
x1_pinn = torch.tensor(partial_lamb[f'Lamb_{0.6}_{1}']).detach().numpy()

x2_dnn = torch.tensor(partial_lamb[f'Lamb_{0.0}_{2}']).detach().numpy()
x2_pinn = torch.tensor(partial_lamb[f'Lamb_{0.6}_{2}']).detach().numpy()

x3_dnn = torch.tensor(partial_lamb[f'Lamb_{0.0}_{3}']).detach().numpy()
x3_pinn = torch.tensor(partial_lamb[f'Lamb_{0.6}_{3}']).detach().numpy()

# 创建索引用于 y 轴
y_ = np.arange(1, len(x1_dnn) + 1)

# 子图设置
fig, axs = plt.subplots(1, 3, figsize=(16, 5), sharey=True)

# 子图标题与标签
titles = ['(a)', '(b)', '(c)']
xlabels = [r'$\partial N_{cf}/\partial \Delta \varepsilon_a$', 
           r'$\partial N_{cf}/\partial t_h$', 
           r'$\partial^2 N_{cf}/\partial t_h^2$']
xvals = [(x1_dnn, x1_pinn), (x2_dnn, x2_pinn), (x3_dnn, x3_pinn)]

for i in range(3):
    ax = axs[i]
    dnn, pinn = xvals[i]

    # 连线
    for j in range(len(x1_dnn)):
        ax.plot([dnn[j], pinn[j]], [y_[j], y_[j]], color='gray', linewidth=0.8)

    # 散点
    ax.scatter(dnn, y_, label='DNN')
    ax.scatter(pinn, y_, edgecolor='black', label='PINN')

    # 阴影区域
    #ax.axvspan(shaded_regions[i][0], shaded_regions[i][1], ymin=0, ymax=1, color='red', alpha=0.15)

    # 文本注释
    #ax.text(np.mean(shaded_regions[i]), 20, 'Inconsistent\nwith Physics', color='brown', fontsize=10)

    # 标签与标题
    ax.set_title(titles[i], loc='left', fontsize=12)
    ax.set_xlabel(xlabels[i], fontsize=12)
    #ax.set_xlim(xlims[i])
    ax.grid(True, linestyle='--', alpha=0.5)

# Y轴标签统一放在左边
axs[0].set_ylabel('Creep-fatigue data points', fontsize=12)

# 图例只加一次
axs[0].legend()

plt.tight_layout()
plt.savefig('Creep-fatigue data points.png')


In [None]:
#for i in range(6):
#    plt.figure(figsize=(12, 8), dpi=100)
#    for lamb in np.arange(0, 1, 0.2):
#        plt.plot(phy_losses_lamb[f'Fold_{i+1}_Lamb_{lamb}'], label=f'Lambda={round(lamb, 1)}')
#    plt.legend()
#    plt.ylabel('Physics Loss')
#    plt.xlabel('Epoch')
#    plt.title(f'Physics Loss Fold {i+1} with Lambda {round(lamb, 1)}')
#    plt.savefig(f'Physics Loss Fold {i+1} with Lambda {int(lamb*10)}.png')

In [None]:
times = df.select(pl.col('循环次数')).to_numpy().ravel()
np.random.seed(42)

markers = ["o", "v", "^", "<", ">", "p"]

plt.figure(figsize=(12, 8), dpi=200)
plt.grid(True)  # 显示网格线

from sklearn.model_selection import KFold
kf = KFold(n_splits=6, shuffle=True, random_state=42)

for i, ((trn_idx, val_idx), mark) in enumerate(zip(kf.split(times), markers)):
    plt.scatter(y[val_idx], y_pred_lamb[f'Lamb_{0.0}'][val_idx], label=f'Fold {1+i}', marker=mark)
    

plt.plot([y.min()*0.5, y.max()*2], [y.min()*0.5, y.max()*2], color='blue', alpha=0.5)
factor = 0.25
plt.fill_between([y.min()*0.5, times.max()*2], [y.min()*0.5*(factor), y.max()*2*factor], [y.min()*0.5/factor, y.max()*2/factor], alpha=0.1, label='±2σ')

plt.legend()
plt.xscale('log')     # 对横轴取 log
plt.yscale('log')
plt.xlim((y.min()-10, y.max()+3000))
plt.ylim((y.min()-10, y.max()+3000))
plt.xlabel('Experimental creep-fatigue life')
plt.ylabel('Predicted creep-fatigue life')
plt.title(f'Inconel-617 Creep-Fatigue Life Prediction with PINN')
plt.savefig(f'Inconel-617 Creep-Fatigue Life Prediction with PINN.png')

In [None]:
times = df.select(pl.col('循环次数')).to_numpy().ravel()
np.random.seed(42)

markers = ["o", "v", "^", "<", ">", "p"]

plt.figure(figsize=(12, 8), dpi=200)
plt.grid(True)  # 显示网格线

from sklearn.model_selection import KFold
kf = KFold(n_splits=6, shuffle=True, random_state=42)

for i, ((trn_idx, val_idx), mark) in enumerate(zip(kf.split(times), markers)):
    plt.scatter(y[val_idx], y_pred_lamb[f'Lamb_{0.6}'][val_idx], label=f'Fold {1+i}', marker=mark)
    

plt.plot([y.min()*0.5, y.max()*2], [y.min()*0.5, y.max()*2], color='blue', alpha=0.5)
factor = 0.25
plt.fill_between([y.min()*0.5, times.max()*2], [y.min()*0.5*(factor), y.max()*2*factor], [y.min()*0.5/factor, y.max()*2/factor], alpha=0.1, label='±2σ')

plt.legend()
plt.xscale('log')     # 对横轴取 log
plt.yscale('log')
plt.xlim((y.min()-10, y.max()+3000))
plt.ylim((y.min()-10, y.max()+3000))
plt.xlabel('Experimental creep-fatigue life')
plt.ylabel('Predicted creep-fatigue life')
plt.title(f'Inconel-617 Creep-Fatigue Life Prediction with DNN')
plt.savefig(f'Inconel-617 Creep-Fatigue Life Prediction with DNN.png')