In [None]:
# Install necessary packages (runs in the notebook environment)
# When running in Jupyter, these magic commands install missing modules into the kernel.
%pip install -q torch torchvision torchaudio
%pip install -q torch-geometric

import os
import numpy as np
import pandas as pd

from sklearn.model_selection import train_test_split
from sklearn.metrics import mean_squared_error
from sklearn.preprocessing import StandardScaler

import torch
import torch.nn as nn
import torch.nn.functional as F

from torch_geometric.data import Data, DataLoader
from torch_geometric.nn import GCNConv

# ============================================================
# 1. Load dataset
# ============================================================

CSV_PATH = "Battery_dataset 2.csv"   # adjust if needed
df = pd.read_csv(CSV_PATH)

print("Head:")
print(df.head())
print("\nColumns:", df.columns.tolist())

# Expected:
# ['battery_id','cycle','chI','chV','chT','disI','disV','disT','BCt','SOH','RUL']

df = df.sort_values(["battery_id", "cycle"]).reset_index(drop=True)

feature_cols = ["chI", "chV", "chT", "disI", "disV", "disT", "BCt"]
target_cols = ["SOH", "RUL"]

# ============================================================
# 2. Train / Test split at battery level
# ============================================================

batteries = df["battery_id"].unique()
train_batt, test_batt = train_test_split(batteries, test_size=0.3, random_state=42)

print("\nTrain batteries:", train_batt)
print("Test batteries:", test_batt)

df_train = df[df["battery_id"].isin(train_batt)].copy()
df_test  = df[df["battery_id"].isin(test_batt)].copy()

# ============================================================
# 3. Feature scaling
# ============================================================

scaler = StandardScaler()
df_train[feature_cols] = scaler.fit_transform(df_train[feature_cols].values)
df_test[feature_cols]  = scaler.transform(df_test[feature_cols].values)

# (optional) Save scaler for later use
import joblib
joblib.dump(scaler, "pinn_gnn_scaler.pkl")

# ============================================================
# 4. Build graphs per battery
# ============================================================

def build_graphs_from_df(df_part):
    graphs = []
    for bid, grp in df_part.groupby("battery_id"):
        grp = grp.sort_values("cycle").reset_index(drop=True)
        x = torch.tensor(grp[feature_cols].values, dtype=torch.float)    # [N, F]
        y = torch.tensor(grp[target_cols].values, dtype=torch.float)     # [N, 2]

        num_nodes = len(grp)

        # Undirected edges for GCN
        undirected_edges = []
        # Directed edges (earlier -> later) for physics
        directed_edges = []
        for i in range(num_nodes - 1):
            # undirected
            undirected_edges.append([i, i+1])
            undirected_edges.append([i+1, i])
            # directed: earlier -> later
            directed_edges.append([i, i+1])

        edge_index = torch.tensor(undirected_edges, dtype=torch.long).t().contiguous()
        edge_index_mono = torch.tensor(directed_edges, dtype=torch.long).t().contiguous()

        data = Data(x=x, edge_index=edge_index, y=y)
        data.edge_index_mono = edge_index_mono   # store separately
        data.battery_id = bid
        data.cycle = torch.tensor(grp["cycle"].values, dtype=torch.float)

        graphs.append(data)
    return graphs

train_graphs = build_graphs_from_df(df_train)
test_graphs  = build_graphs_from_df(df_test)

print(f"\nNum train graphs: {len(train_graphs)}, Num test graphs: {len(test_graphs)}")

# DataLoaders
batch_size = 4
train_loader = DataLoader(train_graphs, batch_size=batch_size, shuffle=True)
test_loader  = DataLoader(test_graphs, batch_size=1, shuffle=False)

# ============================================================
# 5. Define PINN-GNN model (multi-output: SOH & RUL)
# ============================================================

class PINNGNN(nn.Module):
    def __init__(self, in_channels, hidden_channels=64, out_channels=2):
        super().__init__()
        self.conv1 = GCNConv(in_channels, hidden_channels)
        self.conv2 = GCNConv(hidden_channels, hidden_channels)
        self.conv3 = GCNConv(hidden_channels, hidden_channels)
        self.conv4 = GCNConv(hidden_channels, hidden_channels)
        self.conv5 = GCNConv(hidden_channels, hidden_channels)
        

        self.lin1  = nn.Linear(hidden_channels, hidden_channels)
        self.lin2  = nn.Linear(hidden_channels, out_channels)

    def forward(self, x, edge_index):
        # x: [N, F]
        # edge_index: [2, E]
        h = self.conv1(x, edge_index)
        h = F.relu(h)

        h = self.conv2(h, edge_index)
        h = F.relu(h)

        h = self.conv3(h, edge_index)
        h = F.relu(h)

        h = self.conv4(h, edge_index)
        h = F.relu(h)
        
        h = self.conv5(h, edge_index)
        h = F.relu(h)


        h = self.lin1(h)
        h = F.relu(h)

        out = self.lin2(h)   # [N, 2] -> SOH, RUL
        return out

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = PINNGNN(in_channels=len(feature_cols)).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

# ============================================================
# 6. Physics-informed loss components
# ============================================================

def rmse_torch(y_true, y_pred):
    return torch.sqrt(F.mse_loss(y_pred, y_true))

def physics_losses(pred, edge_index_mono, edge_index_smooth):
    """
    pred: [N, 2]
    edge_index_mono: [2, E_m] earlier -> later
    edge_index_smooth: [2, E_s] undirected for smoothness
    """
    soh_pred = pred[:, 0]
    rul_pred = pred[:, 1]  # not used yet, but kept for future physics

    # ---- Monotonicity (SOH at later cycle <= earlier cycle) ----
    src_m = edge_index_mono[0]
    dst_m = edge_index_mono[1]

    soh_diff = soh_pred[dst_m] - soh_pred[src_m]  # later - earlier
    mono_violation = torch.clamp(soh_diff, min=0.0)
    mono_loss = (mono_violation ** 2).mean()

    # ---- Smoothness (use undirected edges) ----
    src_s = edge_index_smooth[0]
    dst_s = edge_index_smooth[1]

    diff_vec = pred[dst_s] - pred[src_s]   # [E_s, 2]
    smooth_loss = (diff_vec ** 2).mean()

    return mono_loss, smooth_loss

lambda_mono = 1.0
lambda_smooth = 0.1

# ============================================================
# 7. Training loop
# ============================================================

def train_one_epoch():
    model.train()
    total_loss = 0.0
    total_rmse_soh = 0.0
    total_rmse_rul = 0.0
    count = 0

    for batch in train_loader:
        batch = batch.to(device)
        optimizer.zero_grad()

        out = model(batch.x, batch.edge_index)   # [total_nodes_in_batch, 2]
        y_true = batch.y                         # same shape

        # Supervised loss
        loss_sup = F.mse_loss(out, y_true)

        # Physics-informed losses
        # FIXED: pass both edge_index_mono and edge_index
        mono_loss, smooth_loss = physics_losses(
            out,
            batch.edge_index_mono,
            batch.edge_index
        )

        loss = loss_sup + lambda_mono * mono_loss + lambda_smooth * smooth_loss
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

        # RMSE for logging (on batch)
        soh_true = y_true[:, 0]
        rul_true = y_true[:, 1]
        soh_pred = out[:, 0]
        rul_pred = out[:, 1]
        total_rmse_soh += rmse_torch(soh_true, soh_pred).item()
        total_rmse_rul += rmse_torch(rul_true, rul_pred).item()
        count += 1

    return total_loss / count, total_rmse_soh / count, total_rmse_rul / count

@torch.no_grad()
def eval_loader(loader):
    model.eval()
    all_true_soh, all_pred_soh = [], []
    all_true_rul, all_pred_rul = [], []

    for batch in loader:
        batch = batch.to(device)
        out = model(batch.x, batch.edge_index)
        y_true = batch.y

        all_true_soh.append(y_true[:, 0].cpu().numpy())
        all_true_rul.append(y_true[:, 1].cpu().numpy())
        all_pred_soh.append(out[:, 0].cpu().numpy())
        all_pred_rul.append(out[:, 1].cpu().numpy())

    all_true_soh = np.concatenate(all_true_soh)
    all_pred_soh = np.concatenate(all_pred_soh)
    all_true_rul = np.concatenate(all_true_rul)
    all_pred_rul = np.concatenate(all_pred_rul)

    rmse_soh = np.sqrt(mean_squared_error(all_true_soh, all_pred_soh))
    rmse_rul = np.sqrt(mean_squared_error(all_true_rul, all_pred_rul))

    return rmse_soh, rmse_rul, (all_true_soh, all_pred_soh, all_true_rul, all_pred_rul)

num_epochs = 80
best_rmse_soh = float("inf")
best_state = None

for epoch in range(1, num_epochs + 1):
    train_loss, train_rmse_soh, train_rmse_rul = train_one_epoch()
    test_rmse_soh, test_rmse_rul, _ = eval_loader(test_loader)

    print(f"Epoch {epoch:03d} | "
          f"TrainLoss={train_loss:.5f} | "
          f"TrainRMSE(SOH)={train_rmse_soh:.5f} | TrainRMSE(RUL)={train_rmse_rul:.5f} | "
          f"TestRMSE(SOH)={test_rmse_soh:.5f} | TestRMSE(RUL)={test_rmse_rul:.5f}")

    if test_rmse_soh < best_rmse_soh:
        best_rmse_soh = test_rmse_soh
        best_state = model.state_dict()

# Load best state
if best_state is not None:
    model.load_state_dict(best_state)
    print("\nLoaded best model based on SOH RMSE.")

# Final evaluation
test_rmse_soh, test_rmse_rul, (true_soh, pred_soh, true_rul, pred_rul) = eval_loader(test_loader)
print("\n=== FINAL TEST PERFORMANCE (PINN-GNN) ===")
print(f"SOH RMSE : {test_rmse_soh:.6f}")
print(f"RUL RMSE : {test_rmse_rul:.6f}")

# ============================================================
# 8. Save model
# ============================================================

MODEL_PATH = "pinn_gnn_model.pt"
torch.save(model.state_dict(), MODEL_PATH)
print(f"\nSaved PINN-GNN model to: {os.path.abspath(MODEL_PATH)}")


Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Head:
  battery_id  cycle       chI       chV        chT      disI      disV  \
0         B5      1  1.440147  4.254682  23.988733  1.894407  3.273523   
1         B5      2  1.416595  4.159825  25.665347  1.829949  4.038741   
2         B5      3  1.420272  4.276323  25.407910  1.942105  3.214433   
3         B5      4  1.337680  4.236697  27.069757  2.073577  3.134529   
4         B5      5  1.263946  4.142791  26.478353  2.049885  3.729341   

        disT       BCt        SOH  RUL  
0  32.980834  1.986196  99.309790  219  
1  32.257920  1.986240  99.311985  218  
2  35.134801  1.984252  99.212608  217  
3  32.082988  1.969236  98.461812  216  
4  32.483154  1.974862  98.743106  215  

Columns: ['battery_id', 'cycle', 'chI', 'chV', 'chT', 'disI', 'disV', 'disT', 'BCt', 'SOH', 'RUL']

Train batteries: ['B6' 'B7']
Test batteries: ['B5']

Num train graphs:

  train_loader = DataLoader(train_graphs, batch_size=batch_size, shuffle=True)
  test_loader  = DataLoader(test_graphs, batch_size=1, shuffle=False)


Epoch 022 | TrainLoss=11478.58008 | TrainRMSE(SOH)=72.18861 | TrainRMSE(RUL)=133.21397 | TestRMSE(SOH)=73.22641 | TestRMSE(RUL)=125.83380
Epoch 023 | TrainLoss=11454.48145 | TrainRMSE(SOH)=72.04282 | TrainRMSE(RUL)=133.11195 | TestRMSE(SOH)=73.05445 | TestRMSE(RUL)=125.71004
Epoch 024 | TrainLoss=11425.89746 | TrainRMSE(SOH)=71.87071 | TrainRMSE(RUL)=132.99019 | TestRMSE(SOH)=72.85189 | TestRMSE(RUL)=125.56163
Epoch 025 | TrainLoss=11391.98438 | TrainRMSE(SOH)=71.66848 | TrainRMSE(RUL)=132.84424 | TestRMSE(SOH)=72.61346 | TestRMSE(RUL)=125.38388
Epoch 026 | TrainLoss=11351.80859 | TrainRMSE(SOH)=71.43067 | TrainRMSE(RUL)=132.66977 | TestRMSE(SOH)=72.33268 | TestRMSE(RUL)=125.17164
Epoch 027 | TrainLoss=11304.30762 | TrainRMSE(SOH)=71.15105 | TrainRMSE(RUL)=132.46182 | TestRMSE(SOH)=72.00053 | TestRMSE(RUL)=124.91908
Epoch 028 | TrainLoss=11248.14453 | TrainRMSE(SOH)=70.82057 | TrainRMSE(RUL)=132.21468 | TestRMSE(SOH)=71.60679 | TestRMSE(RUL)=124.61845
Epoch 029 | TrainLoss=11181.68457 

In [16]:
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import numpy as np

def rmse(y_true, y_pred):
    return np.sqrt(mean_squared_error(y_true, y_pred))

def mae(y_true, y_pred):
    return mean_absolute_error(y_true, y_pred)

def mape(y_true, y_pred, eps=1e-6):
    y_true = np.array(y_true)
    y_pred = np.array(y_pred)
    return np.mean(np.abs((y_pred - y_true) / (y_true + eps))) * 100

def nrmse(y_true, y_pred):
    return rmse(y_true, y_pred) / (np.max(y_true) - np.min(y_true))

def accuracy_from_mape(mape_value):
    """Converts regression to accuracy-like score."""
    return (1 - (mape_value / 100)) * 100

def print_metrics(title, y_true, y_pred):
    print(f"\n===== {title} =====")
    r = rmse(y_true, y_pred)
    a = mae(y_true, y_pred)
    m = mape(y_true, y_pred)
    r2 = r2_score(y_true, y_pred)
    nr = nrmse(y_true, y_pred)
    acc = accuracy_from_mape(m)

    print(f"RMSE      : {r:.6f}")
    print(f"MAE       : {a:.6f}")
    print(f"MAPE      : {m:.4f}%")
    print(f"R² Score  : {r2:.6f}")
    print(f"NRMSE     : {nr:.6f}")
    print(f"Accuracy* : {acc:.3f}%   (1 - MAPE)")


In [17]:
print("\n\n==================== FINAL PINN-GNN METRICS ====================")

print_metrics("SOH (State of Health)", true_soh, pred_soh)
print_metrics("RUL (Remaining Useful Life)", true_rul, pred_rul)

print("\n===============================================================")





===== SOH (State of Health) =====
RMSE      : 16.388976
MAE       : 14.941216
MAPE      : 21.6187%
R² Score  : -0.066427
NRMSE     : 0.301274
Accuracy* : 78.381%   (1 - MAPE)

===== RUL (Remaining Useful Life) =====
RMSE      : 27.930319
MAE       : 22.790058
MAPE      : 27105171.8750%
R² Score  : 0.806582
NRMSE     : 0.127536
Accuracy* : -27105071.875%   (1 - MAPE)

