# Integrated PINN + Validator — NACA0012 Dataset Airfoil
This notebook uses Nvidia PhysicsNeMo along with customisable PyTorch PINN code. Validator runs every 100 epochs and saves PDF reports. This can be run locally or on the google colab 


A Colab-ready PINN model for NACA0012 that use PhysicsNeMo’s NavierStokes helper when available or fall back to a PyTorch autodiff residual implementation

A validator that runs automatically every 100 epochs, evaluates the trained PINN on a grid, computes:

mass-conservation mean absolute divergence,

airfoil no-penetration RMS & max,

pressure-only Lift & Drag (per unit span),

and writes a PDF “Proof of Simulation” containing metrics, streamlines, and Cp.

Checkpointing at each validation epoch (chcks folder)

In [None]:
import torch, sys
print('Torch', torch.__version__, 'CUDA available?', torch.cuda.is_available())
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('Device:', device)

PhysicsNeMo detection

In [None]:
HAS_PNM = False
try:
    import physicsnemo
    from physicsnemo.sym.eq.pdes import navier_stokes as pn_ns
    HAS_PNM = True
    print('PhysicsNeMo found:', getattr(physicsnemo,'__version__','unknown'))
except Exception as e:
    print('PhysicsNeMo not found; falling back to PyTorch PINN. Error:', e)

NACA 0012 Geometry & Sampling

![Sample Image](Airfoil_lines.png)

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from matplotlib.path import Path
plt.rcParams['figure.figsize'] = (8,4)

def naca_coords(t=0.12, n=600):
    beta = np.linspace(0, np.pi, n)
    x = 0.5*(1 - np.cos(beta))
    yt = 5*t*(0.2969*np.sqrt(x)-0.1260*x-0.3516*x**2+0.2843*x**3-0.1015*x**4)
    xu, xl = x-yt, x+yt
    xu_rev = xu[::-1]
    xs = np.concatenate([xu_rev, xl[1:]])
    ys = np.concatenate([yt[::-1], (-yt)[1:]])
    return np.vstack([xs, ys]).T

coords = naca_coords()
path = Path(coords)
print('coords shape', coords.shape)
plt.plot(coords[:,0], coords[:,1]); plt.axis('equal'); plt.title('NACA0012'); plt.show()

Using Custom Models in PhysicsNeMo or If its not available

In [None]:
import torch.nn as nn
class MLP(nn.Module):
    def __init__(self):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(2,256), nn.Tanh(),
            nn.Linear(256,256), nn.Tanh(),
            nn.Linear(256,4)
        )
    def forward(self,x): return self.net(x)

net = MLP().to(device)
print('Network created on', device)

Residual functions (PhysicsNeMo → fallback)

In [None]:
from torch.autograd import grad
gamma = 1.4
ns_helper = None
if HAS_PNM:
    try:
        ns_helper = pn_ns.NavierStokes(viscosity=1e-5, gamma=gamma)
        print('PhysicsNeMo NavierStokes helper ready')
    except Exception as e:
        print('Could not init pn_ns.NavierStokes, will use fallback:', e)
        ns_helper = None

def compute_residuals_fallback(pts):
    xy = torch.tensor(pts, dtype=torch.float32, device=device, requires_grad=True)
    out = net(xy)
    rho = out[:,0:1]; u = out[:,1:2]; v = out[:,2:3]; p = out[:,3:4]
    E = p/(gamma-1.0) + 0.5 * rho * (u**2 + v**2)
    rhou = rho * u; rhov = rho * v
    fx1 = rhou; fx2 = rhou*u + p; fx3 = rhou*v; fx4 = u*(E + p)
    fy1 = rhov; fy2 = rhov*u; fy3 = rhov*v + p; fy4 = v*(E + p)
    def g(f): return grad(f, xy, grad_outputs=torch.ones_like(f), create_graph=True, retain_graph=True)[0]
    r1 = g(fx1)[:,0:1] + g(fy1)[:,1:2]
    r2 = g(fx2)[:,0:1] + g(fy2)[:,1:2]
    r3 = g(fx3)[:,0:1] + g(fy3)[:,1:2]
    r4 = g(fx4)[:,0:1] + g(fy4)[:,1:2]
    return r1, r2, r3, r4

print('Residual helper ready')

In [None]:
from scipy import interpolate
nx, ny = 300, 150
xmin, xmax, ymin, ymax = -1.5, 3.0, -1.5, 1.5
xs = np.linspace(xmin, xmax, nx)
ys = np.linspace(ymin, ymax, ny)
X, Y = np.meshgrid(xs, ys)
grid = np.vstack([X.ravel(), Y.ravel()]).T

def eval_grid():
    with torch.no_grad():
        out = net(torch.tensor(grid, dtype=torch.float32, device=device)).cpu().numpy()
    rho = out[:,0].reshape(ny, nx)
    u = out[:,1].reshape(ny, nx)
    v = out[:,2].reshape(ny, nx)
    p = out[:,3].reshape(ny, nx)
    return rho, u, v, p

def mass_div_fd(rho, u, v):
    rhou = rho * u; rhov = rho * v
    drx = np.gradient(rhou, xs, axis=1)
    dry = np.gradient(rhov, ys, axis=0)
    return float(np.mean(np.abs(drx + dry)))

def bc_no_pen(u_grid, v_grid, surface_pts):
    f_u = interpolate.RegularGridInterpolator((ys, xs), u_grid)
    f_v = interpolate.RegularGridInterpolator((ys, xs), v_grid)
    uu = np.array([f_u((pt[1], pt[0])) for pt in surface_pts]).flatten()
    vv = np.array([f_v((pt[1], pt[0])) for pt in surface_pts]).flatten()
    pts = surface_pts
    tang = np.zeros_like(pts)
    for i in range(len(pts)):
        i0 = (i-1) % len(pts); i1 = (i+1) % len(pts)
        tg = pts[i1] - pts[i0]
        tg = tg / (np.linalg.norm(tg) + 1e-12)
        tang[i] = tg
    normals = np.zeros_like(tang); normals[:,0] = -tang[:,1]; normals[:,1] = tang[:,0]
    vdotn = uu * normals[:,0] + vv * normals[:,1]
    return float(np.mean(np.abs(vdotn))), float(np.max(np.abs(vdotn)))

def pressure_forces(p_grid, surface_pts):
    f_p = interpolate.RegularGridInterpolator((ys, xs), p_grid)
    p_surf = np.array([f_p((pt[1], pt[0])) for pt in surface_pts]).flatten()
    pts = surface_pts
    tang = np.zeros_like(pts)
    for i in range(len(pts)):
        i0 = (i-1) % len(pts); i1 = (i+1) % len(pts)
        tg = pts[i1] - pts[i0]
        tg = tg / (np.linalg.norm(tg) + 1e-12)
        tang[i] = tg
    normals = np.zeros_like(tang); normals[:,0] = -tang[:,1]; normals[:,1] = tang[:,0]
    seg_lengths = np.linalg.norm(np.roll(pts, -1, axis=0) - pts, axis=1)
    force_segments = (-p_surf[:,None] * normals) * seg_lengths[:,None]
    F_total = force_segments.sum(axis=0)
    Drag = -F_total[0]; Lift = -F_total[1]
    q_inf = 0.5 * 1.225 * (1.2 * ((gamma*287.058*288.15)**0.5))**2
    Cl = Lift / (q_inf * 1.0); Cd = Drag / (q_inf * 1.0)
    return {'Lift': float(Lift), 'Drag': float(Drag), 'Cl': float(Cl), 'Cd': float(Cd)}

print('Eval & validator helpers ready')

Evaluation & validation helpers, PDF report generator

In [None]:
from matplotlib.backends.backend_pdf import PdfPages
import datetime, json, sys

def save_pdf_report(metrics, rho, u, v, p, surface_pts, filename='validation_report.pdf'):
    q_inf = 0.5 * 1.225 * (1.2 * ((gamma*287.058*288.15)**0.5))**2
    cp = (p - p.mean()) / q_inf
    with PdfPages(filename) as pdf:
        fig, ax = plt.subplots(figsize=(8.27, 11.69))
        ax.axis('off')
        lines = []
        lines.append('CFD Validation Report — PINN NACA0012')
        lines.append('Date (UTC): ' + datetime.datetime.utcnow().isoformat())
        lines.append('')
        for k, v in metrics.items():
            lines.append(f'{k}: {v}')
        meta = {'python': sys.version.split('\n')[0], 'torch': torch.__version__, 'physicsnemo': HAS_PNM}
        lines.append('')
        lines.append('Environment:')
        lines.append(json.dumps(meta, indent=2))
        ax.text(0.01, 0.99, '\n'.join(lines), va='top', ha='left', fontsize=9, family='monospace')
        pdf.savefig(fig); plt.close(fig)

        fig, ax = plt.subplots(figsize=(8,4))
        ax.streamplot(np.linspace(xmin, xmax, u.shape[1]), np.linspace(ymin, ymax, u.shape[0]), u, v, density=1.2)
        ax.plot(surface_pts[:,0], surface_pts[:,1], 'k')
        ax.set_title('Streamlines')
        ax.set_aspect('equal')
        pdf.savefig(fig); plt.close(fig)

        fig, ax = plt.subplots(figsize=(8,4))
        Xc, Yc = np.meshgrid(np.linspace(xmin, xmax, cp.shape[1]), np.linspace(ymin, ymax, cp.shape[0]))
        cf = ax.contourf(Xc, Yc, cp, levels=40)
        fig.colorbar(cf, ax=ax)
        ax.plot(surface_pts[:,0], surface_pts[:,1], 'k')
        ax.set_title('Pressure coefficient Cp')
        ax.set_aspect('equal')
        pdf.savefig(fig); plt.close(fig)

    print('Saved PDF report to', filename)
    return filename

print('PDF helper ready')

Complete Training loop + Validator (100 epoch interval)

In [None]:
import torch.optim as optim, os
opt = optim.Adam(net.parameters(), lr=1e-4)

n_epochs = 1000
validator_interval = 100
ckpt_dir = 'ckpts'
os.makedirs(ckpt_dir, exist_ok=True)

for epoch in range(1, n_epochs+1):
    net.train()
    pts = sample_interior(2048)
    if ns_helper is not None:
        try:
            xy = torch.tensor(pts, dtype=torch.float32, device=device, requires_grad=True)
            out = net(xy)
            rho = out[:,0:1]; u = out[:,1:2]; v = out[:,2:3]; p = out[:,3:4]
            res = ns_helper.compute_residuals(xy, rho, u, v, p)
        except Exception:
            res = compute_residuals_fallback(pts)
    else:
        res = compute_residuals_fallback(pts)

    if isinstance(res, (list, tuple)):
        loss_pde = sum([r.pow(2).mean() for r in res])
    else:
        loss_pde = res.pow(2).mean()

    # BC loss (no-penetration)
    surf = sample_airfoil(400)
    surf_t = torch.tensor(surf, dtype=torch.float32, device=device)
    out_s = net(surf_t)
    u_s = out_s[:,1:2]; v_s = out_s[:,2:3]
    tang = np.zeros_like(surf)
    for i in range(len(surf)):
        i0 = (i-1) % len(surf); i1 = (i+1) % len(surf)
        tg = surf[i1] - surf[i0]
        tg = tg / (np.linalg.norm(tg) + 1e-12)
        tang[i] = tg
    normals = np.zeros_like(tang); normals[:,0] = -tang[:,1]; normals[:,1] = tang[:,0]
    normals_t = torch.tensor(normals, dtype=torch.float32, device=device)
    vel_s = torch.cat([u_s, v_s], dim=1)
    normal_dot = (vel_s * normals_t).sum(dim=1, keepdim=True)
    loss_bc = (normal_dot**2).mean()

    loss = loss_pde + 200.0 * loss_bc
    opt.zero_grad(); loss.backward(); opt.step()

    if epoch % 50 == 0:
        print(f'Epoch {epoch}/{n_epochs}  loss={loss.item():.3e} pde={loss_pde.item():.3e} bc={loss_bc.item():.3e}')

    if epoch % validator_interval == 0 or epoch == n_epochs:
        print('\nValidator at epoch', epoch)
        net.eval()
        rho_g, u_g, v_g, p_g = eval_grid()
        mc = mass_div_fd(rho_g, u_g, v_g)
        bc_mean, bc_max = bc_no_pen(u_g, v_g, coords)
        forces = pressure_forces(p_g, coords)
        metrics = {'epoch': epoch, 'mass_div_mean': float(mc), 'bc_no_pen_mean': float(bc_mean), 'bc_no_pen_max': float(bc_max)}
        metrics.update(forces)
        pdf_name = f'validation_report_epoch_{epoch}.pdf'
        save_pdf_report(metrics, rho_g, u_g, v_g, p_g, coords, filename=pdf_name)
        ckpt_path = os.path.join(ckpt_dir, f'ckpt_epoch_{epoch}.pth')
        torch.save({'epoch': epoch, 'model_state': net.state_dict()}, ckpt_path)
        print('Saved checkpoint and PDF report for epoch', epoch, '\n')

print('Training + validation finished')