<a href="https://colab.research.google.com/github/Mechanics-Mechatronics-and-Robotics/ML-2025a/blob/main/PINN_Plates.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Initialization

In [None]:
# Install dependencies for Colab
!pip install pytorch-lightning clearml

import math
import torch
import torch.nn as nn
import pytorch_lightning as pl
from pytorch_lightning.callbacks import ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger
from clearml import Task
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


from torch.utils.data import Dataset, DataLoader
from dataclasses import dataclass

In [None]:
#Enter your code here to implement Step 2 of the logging instruction as it is shown below
%env CLEARML_WEB_HOST=https://app.clear.ml/
%env CLEARML_API_HOST=https://api.clear.ml
%env CLEARML_FILES_HOST=https://files.clear.ml
%env CLEARML_API_ACCESS_KEY=ZP02U03C6V5ER4K9VWRNZT7EWA5ZTV
%env CLEARML_API_SECRET_KEY=BtA5GXZufr6QGpaqhX1GSKPTvaCt56OLqaNqUGLNoxx2Ye8Ctwbui0Ln5OXVnzUgH4I

# 🧠 Theoretical Introduction: Dimensionless PINNs for Parallel Plate Flow

We consider the steady, fully developed flow of an incompressible Newtonian fluid between two infinite parallel plates separated by distance \(h\).  
The velocity is unidirectional along \(x\), varying only in the transverse direction \(y\), so the governing equation reduces to:

$$
\mu \frac{d^2 u}{dy^2} = \frac{dp}{dx}
$$

where $\mu$ is dynamic viscosity, $\frac{dp}{dx}$ — constant pressure gradient (typically negative).

The **no-slip boundary conditions** are applied at both plates:
<!--  -->
$$
u\left(-\tfrac{h}{2}\right) = 0, \quad u\left(\tfrac{h}{2}\right) = 0
$$

---

## ✅ Nondimensionalization

We define dimensionless variables:

$$
Y = \frac{y}{h}, \qquad U = \frac{u}{U_\mathrm{ref}}, \qquad
\Pi = \frac{h^2}{\mu U_\mathrm{ref}} \left(\frac{dp}{dx}\right)
$$

Substituting into the PDE gives the **dimensionless Poisson equation**:

$$
\frac{d^2 U}{dY^2} = \Pi
$$

with **boundary conditions**:

$$
U(-\tfrac{1}{2}) = 0, \quad U(\tfrac{1}{2}) = 0
$$

The **analytic solution** in nondimensional form is:

$$
U(Y) = \frac{\Pi}{2} \left( \tfrac{1}{4} - Y^2 \right)
$$

---

## ⚙️ Physics-Informed Neural Network (PINN) Formulation

We approximate $U(Y)$ using a neural network $\hat{U}(Y)$ and enforce physics via loss minimization.

The total loss combines:

- **PDE residual loss** — enforcing the governing equation  
- **Boundary loss** — enforcing no-slip at $Y = \pm \tfrac{1}{2}$

$$
\mathcal{L} =
\underbrace{
\frac{1}{N_r} \sum_{i=1}^{N_r}
\left|
\frac{d^2 \hat{U}}{dY^2}(Y_i) - \Pi
\right|^2
}_{\text{Physics loss}}
+
\underbrace{
\frac{1}{N_b} \sum_{j=1}^{N_b}
\left|
\hat{U}(Y_j) - U_j
\right|^2
}_{\text{Boundary loss}}
$$

# Config

In [None]:
torch.set_default_dtype(torch.float32)
pl.seed_everything(42)

# ----------------------------
# Config
# ----------------------------
@dataclass
class Config:
    # Physics parameters for dimensionalization
    h: float = 0.001               # channel height [m]
    mu: float = 1e-6               # dynamic viscosity [Pa·s]
    dpdx: float = -100000.0        # pressure gradient [Pa/m]
    U_ref: float = 0.1             # reference velocity [m/s]

    # training hyperparams
    lr: float = 1e-3
    epochs: int = 1000
    n_colloc_per_Y: int = 64       # spatial points per batch
    n_boundary_per_Y: int = 2      # boundary points (fixed: Y=-0.5, 0.5)
    batch_size_pi: int = 20         # number of Pi values per batch
    n_pi_total: int = 100          # total number of Pi values
    pi_min: float = 5.0
    pi_max: float = 5.0

    # data split ratios
    train_ratio: float = 0.6
    val_ratio: float = 0.2
    test_ratio: float = 0.2

    # network
    layers: tuple = (2, 64, 64, 64, 64, 1)

    # settings
    check_val_every_n_epoch: int = 100
    task_name: str = "PINN_plates_dimless_generalized"

cfg = Config()

# Dataset and Dataloaders

In [None]:
# ----------------------------
# Dataset Classes
# ----------------------------
class PiSpatialDataset(Dataset):
    """Dataset that returns batches of spatial points for specific Pi values"""
    def __init__(self, pi_values, n_colloc_per_Y, n_boundary_per_Y, device):
        self.pi_values = pi_values
        self.n_colloc_per_Y = n_colloc_per_Y
        self.n_boundary_per_Y = n_boundary_per_Y
        self.device = device

        # Precompute spatial grids
        self.Y_colloc = torch.linspace(-0.5, 0.5, n_colloc_per_Y, device=device).reshape(-1, 1)
        self.Y_boundary = torch.tensor([[-0.5], [0.5]], device=device, dtype=torch.float32)

    def __len__(self):
        return len(self.pi_values)

    def __getitem__(self, idx):
        pi = self.pi_values[idx]

        # Collocation points
        Y_colloc_batch = self.Y_colloc
        Pi_colloc_batch = torch.full_like(Y_colloc_batch, pi)
        X_colloc = torch.cat([Y_colloc_batch, Pi_colloc_batch], dim=1)

        # Boundary points
        Y_boundary_batch = self.Y_boundary
        Pi_boundary_batch = torch.full_like(Y_boundary_batch, pi)
        X_boundary = torch.cat([Y_boundary_batch, Pi_boundary_batch], dim=1)
        U_boundary = torch.zeros_like(Y_boundary_batch)

        return X_colloc, X_boundary, U_boundary

class PiBatchDataModule(pl.LightningDataModule):
    def __init__(self, cfg: Config, device="cpu"):
        super().__init__()
        self.cfg = cfg
        self.device = device

    def setup(self, stage=None):
        # Generate Pi values
        n_total = self.cfg.n_pi_total
        self.pi_all = torch.linspace(
            self.cfg.pi_min, self.cfg.pi_max, n_total, device=self.device
        )

        # Split into train/val/test
        n_train = int(n_total * self.cfg.train_ratio)
        n_val = int(n_total * self.cfg.val_ratio)
        n_test = n_total - n_train - n_val

        # Shuffle indices
        indices = torch.randperm(n_total)
        train_idx = indices[:n_train]
        val_idx = indices[n_train:n_train + n_val]
        test_idx = indices[n_train + n_val:]

        self.pi_train = self.pi_all[train_idx]
        self.pi_val = self.pi_all[val_idx]
        self.pi_test = self.pi_all[test_idx]

        print(f"Dataset sizes - Train: {len(self.pi_train)}, Val: {len(self.pi_val)}, Test: {len(self.pi_test)}")

    def train_dataloader(self):
        dataset = PiSpatialDataset(
            self.pi_train,
            self.cfg.n_colloc_per_Y,
            self.cfg.n_boundary_per_Y,
            self.device
        )
        return DataLoader(dataset, batch_size=self.cfg.batch_size_pi,
                          num_workers=0, shuffle=True)

    def val_dataloader(self):
        dataset = PiSpatialDataset(
            self.pi_val,
            self.cfg.n_colloc_per_Y,
            self.cfg.n_boundary_per_Y,
            self.device
        )
        return DataLoader(dataset, batch_size=self.cfg.batch_size_pi,
                          num_workers=0, shuffle=False)

    def test_dataloader(self):
        dataset = PiSpatialDataset(
            self.pi_test,
            self.cfg.n_colloc_per_Y,
            self.cfg.n_boundary_per_Y,
            self.device
        )
        return DataLoader(dataset, batch_size=self.cfg.batch_size_pi, shuffle=False)

# Model

In [None]:
task = Task.init(
    project_name="PINN_Project",   # choose a descriptive project name
    task_name=cfg.task_name,  # descriptive task name
)

# task.connect(params)
tb_logger = TensorBoardLogger(
    save_dir="logs",  # ClearML will automatically monitor this
    name=cfg.task_name,
    # version=f"bs_{HP['batch_size']}_lr_{HP['lr_simclr']}"
  )

# ----------------------------
# PINN Model
# ----------------------------
class PINN(pl.LightningModule):
    def __init__(self, layers, lr):
        super().__init__()
        self.save_hyperparameters()
        self.lr = lr
        self.net = self._build_mlp(layers)

    def _build_mlp(self, sizes):
        layers = []
        for i in range(len(sizes)-1):
            layers.append(nn.Linear(sizes[i], sizes[i+1]))
            if i < len(sizes)-2:
                layers.append(nn.Tanh())
        return nn.Sequential(*layers)

    def forward(self, X):
        return self.net(X)

    def _second_deriv_wrt_Y(self, X):
        X_temp = X.clone().requires_grad_(True)
        U = self.forward(X_temp)

        # First derivative wrt Y
        dU_dY = torch.autograd.grad(
            U, X_temp,
            grad_outputs=torch.ones_like(U),
            create_graph=True,
            retain_graph=True
        )[0][:, 0:1]

        # Second derivative wrt Y
        d2U_dY2 = torch.autograd.grad(
            dU_dY, X_temp,
            grad_outputs=torch.ones_like(dU_dY),
            create_graph=True,
            retain_graph=True
        )[0][:, 0:1]

        return d2U_dY2

    def training_step(self, batch, batch_idx):
        X_colloc, X_bc, U_bc = batch

        # Reshape batches: [batch_size_pi * n_points, 2]
        batch_size_pi = X_colloc.shape[0]
        n_colloc = X_colloc.shape[1]
        n_bc = X_bc.shape[1]

        X_colloc_flat = X_colloc.reshape(-1, 2)
        X_bc_flat = X_bc.reshape(-1, 2)
        U_bc_flat = U_bc.reshape(-1, 1)

        # PDE residual loss
        res = self._pde_residual(X_colloc_flat)
        pde_loss = torch.mean(res**1)

        # Boundary loss
        U_pred_bc = self.forward(X_bc_flat)
        bc_loss = nn.functional.mse_loss(U_pred_bc, U_bc_flat)

        # NEW: Physical constraint - velocity should be positive for positive Pi
        # U_pred_colloc = self.forward(X_colloc_flat)
        # Pi_values = X_colloc_flat[:, 1:2]
        # positive_pi_mask = (Pi_values > 0).float()
        # negative_velocity_penalty = torch.relu(-U_pred_colloc)  # Only active when U < 0
        # physical_constraint_loss = torch.mean(positive_pi_mask * negative_velocity_penalty)

        loss = pde_loss + bc_loss
        #  + 0.01 * physical_constraint_loss
        self.log_dict({
            "train_pde": pde_loss,
            "train_bc": bc_loss,
            # "train_pc": physical_constraint_loss,
            "train_loss": loss
        }, prog_bar=True)
        return loss

    def _pde_residual(self, X_colloc):
        U_YY = self._second_deriv_wrt_Y(X_colloc)
        Pi = X_colloc[:, 1:2]
        res = U_YY - Pi
        return res

    def validation_step(self, batch, batch_idx):
        X_colloc, X_bc, U_bc = batch

        # Flatten batches
        X_colloc_flat = X_colloc.reshape(-1, 2)
        X_bc_flat = X_bc.reshape(-1, 2)
        U_bc_flat = U_bc.reshape(-1, 1)

        with torch.enable_grad():
            res = self._pde_residual(X_colloc_flat)
            pde_loss = torch.mean(res**1)
            U_pred_bc = self.forward(X_bc_flat)
            bc_loss = nn.functional.mse_loss(U_pred_bc, U_bc_flat)
            val_loss = pde_loss + bc_loss

        self.log_dict({
            "val_pde": pde_loss,
            "val_bc": bc_loss,
            "val_loss": val_loss
        }, prog_bar=True)
        return val_loss

    def configure_optimizers(self):
        return torch.optim.Adam(self.parameters(), lr=self.lr, weight_decay=1e-5)

# ----------------------------
# Dimensionalization Utilities
# ----------------------------
def dimensionless_to_dimensional(U_dim, Y_dim, Pi, cfg):
    """Convert dimensionless results to dimensional"""
    # Velocity: u = U * U_ref
    u_dimensional = U_dim * cfg.U_ref

    # Position: y = Y * h
    y_dimensional = Y_dim * cfg.h

    # Pressure gradient recovery: dp/dx = (Π * μ * U_ref) / h²
    dpdx_dimensional = (Pi * cfg.mu * cfg.U_ref) / (cfg.h**2)

    return u_dimensional, y_dimensional, dpdx_dimensional

def analytical_solution_dimensional(y, dpdx, mu, h):
    """Analytical solution for dimensional velocity"""
    # u(y) = (1/(2μ)) * (dp/dx) * (y² - (h/2)²)
    return (1/(2*mu)) * dpdx * (y**2 - (h/2)**2)

# ----------------------------
# Training
# ----------------------------
device = "cuda" if torch.cuda.is_available() else "cpu"
dm = PiBatchDataModule(cfg, device=device)
dm.setup()

model = PINN(layers=cfg.layers, lr=cfg.lr)

ckpt_cb = ModelCheckpoint(
    dirpath="checkpoints",
    filename="pinn_best",
    monitor="val_loss",
    save_top_k=1,
    mode="min",
)

trainer = pl.Trainer(
    max_epochs=cfg.epochs,
    accelerator="auto",
    callbacks=[ckpt_cb],
    log_every_n_steps=50,
    check_val_every_n_epoch=cfg.check_val_every_n_epoch
)

trainer.fit(model, datamodule=dm)


In [None]:
# ----------------------------
# Testing and Results
# ----------------------------
best_model = PINN.load_from_checkpoint(ckpt_cb.best_model_path, layers=cfg.layers, lr=cfg.lr)
best_model.eval()
best_model = best_model.to(device)

# Test on various Pi values
test_pi_values = [1.0, 3.0, 5.0, 7.0, 10.0]
Y_test = torch.linspace(-0.5, 0.5, 100, device=device).reshape(-1, 1)

# Results tables
dimensionless_results = []
dimensional_results = []

plt.figure(figsize=(15, 6))

# Plot 1: Dimensionless results
plt.subplot(1, 2, 1)
for pi in test_pi_values:
    Pi_test = torch.full_like(Y_test, pi)
    X_test = torch.cat([Y_test, Pi_test], dim=1)

    with torch.no_grad():
        U_pred_dimless = best_model(X_test).cpu().numpy().flatten()

    U_analytical_dimless = (pi/2.0) * (0.25 - Y_test.cpu().numpy().flatten()**2)

    # Dimensionless errors
    mse_dimless = np.mean((U_pred_dimless - U_analytical_dimless)**2)
    max_error_dimless = np.max(np.abs(U_pred_dimless - U_analytical_dimless))

    dimensionless_results.append({
        "Pi": pi,
        "MSE": mse_dimless,
        "Max_Error": max_error_dimless,
        "U_pred_center": U_pred_dimless[50],  # Y=0
        "U_analytical_center": U_analytical_dimless[50],
        "Error_center": abs(U_pred_dimless[50] - U_analytical_dimless[50])
    })

    plt.plot(Y_test.cpu().numpy(), U_pred_dimless, '--', linewidth=2, label=f'PINN Π={pi}')
    plt.plot(Y_test.cpu().numpy(), U_analytical_dimless, '-', alpha=0.7, label=f'Analytical Π={pi}')

plt.xlabel('Y (Dimensionless)')
plt.ylabel('U (Dimensionless)')
plt.title('Dimensionless Velocity Profiles')
plt.legend()
plt.grid(True)

# Plot 2: Dimensional results
plt.subplot(1, 2, 2)
for pi in test_pi_values:
    Pi_test = torch.full_like(Y_test, pi)
    X_test = torch.cat([Y_test, Pi_test], dim=1)

    with torch.no_grad():
        U_pred_dimless = best_model(X_test).cpu().numpy().flatten()

    # Convert to dimensional
    u_pred_dimensional, y_dimensional, dpdx_recovered = dimensionless_to_dimensional(
        U_pred_dimless, Y_test.cpu().numpy().flatten(), pi, cfg
    )

    # Analytical solution in dimensional form
    u_analytical_dimensional = analytical_solution_dimensional(
        y_dimensional, dpdx_recovered, cfg.mu, cfg.h
    )

    # Dimensional errors
    mse_dimensional = np.mean((u_pred_dimensional - u_analytical_dimensional)**2)
    max_error_dimensional = np.max(np.abs(u_pred_dimensional - u_analytical_dimensional))

    dimensional_results.append({
        "Pi": pi,
        "dpdx_recovered [Pa/m]": dpdx_recovered,
        "MSE [m²/s²]": mse_dimensional,
        "Max_Error [m/s]": max_error_dimensional,
        "u_pred_center [m/s]": u_pred_dimensional[50],
        "u_analytical_center [m/s]": u_analytical_dimensional[50],
        "Error_center [m/s]": abs(u_pred_dimensional[50] - u_analytical_dimensional[50])
    })

    plt.plot(y_dimensional, u_pred_dimensional, '--', linewidth=2, label=f'PINN Π={pi}')
    plt.plot(y_dimensional, u_analytical_dimensional, '-', alpha=0.7, label=f'Analytical Π={pi}')

plt.xlabel('y [m]')
plt.ylabel('u [m/s]')
plt.title('Dimensional Velocity Profiles')
plt.legend()
plt.grid(True)

plt.tight_layout()
plt.show()

# Print results tables
print("\n" + "="*80)
print("DIMENSIONLESS RESULTS")
print("="*80)
df_dimensionless = pd.DataFrame(dimensionless_results)
pd.set_option("display.float_format", "{:.6e}".format)
print(df_dimensionless.to_string(index=False))

print("\n" + "="*80)
print("DIMENSIONAL RESULTS")
print("="*80)
df_dimensional = pd.DataFrame(dimensional_results)
print(df_dimensional.to_string(index=False))

# Summary statistics
print("\n" + "="*80)
print("SUMMARY STATISTICS")
print("="*80)
print(f"Training Pi range: {cfg.pi_min} to {cfg.pi_max}")
print(f"Test Pi values: {test_pi_values}")
print(f"\nDimensionless Max Error across all tests: {max(r['Max_Error'] for r in dimensionless_results):.2e}")
print(f"Dimensional Max Error across all tests: {max(r['Max_Error [m/s]'] for r in dimensional_results):.2e} m/s")

# Test on validation and test sets
print("\n" + "="*80)
print("VALIDATION AND TEST SET PERFORMANCE")
print("="*80)

def evaluate_dataset(pi_values, dataset_name):
    total_loss = 0
    with torch.no_grad():
        for pi in pi_values[:5]:  # Evaluate on first 5 values for brevity
            Pi_test = torch.full_like(Y_test, pi.item())
            X_test = torch.cat([Y_test, Pi_test], dim=1)

            U_pred_dimless = (best_model(X_test).cpu().numpy().flatten())
            U_analytical_dimless = (pi.item()/2.0) * (0.25 - Y_test.cpu().numpy().flatten()**2)

            mse = np.mean((U_pred_dimless - U_analytical_dimless)**2)
            total_loss += mse

    avg_loss = total_loss / min(5, len(pi_values))
    print(f"{dataset_name} set average MSE: {avg_loss:.2e}")

evaluate_dataset(dm.pi_train, "Train")
evaluate_dataset(dm.pi_val, "Validation")
evaluate_dataset(dm.pi_test, "Test")

In [None]:
task.close()