# Pressure Vessel: FEniCS + POD + Physics-Informed NN (Full Elasticity)

This notebook demonstrates an end-to-end **open-source** pipeline:
1) **Mesh generation** with Gmsh → 2) **FEniCS** simulations → 3) **POD** reduction →
4) **PINN** surrogate with **full 3D elasticity residual** → 5) **Error analysis** → 6) **ParaView exports**.

**Tip:** Run on a small mesh first (already set) to validate the pipeline, then refine.


## 0. Setup: installs (optional) and imports
Short description: install required libraries if you don't have them, then import everything we need.

In [None]:
# OPTIONAL installs (uncomment if needed)
# %pip install gmsh meshio fenics torch matplotlib scikit-learn

import numpy as np
import matplotlib.pyplot as plt
import torch
import torch.nn as nn
import torch.optim as optim
from sklearn.model_selection import train_test_split

import gmsh
import meshio
from dolfin import *

print('✅ Imports ready')


## 1. Mesh generation with Gmsh (small mesh for a quick demo)
Short description: build a **hollow cylinder** (pressure vessel section) and tag the inner wall as a physical surface.

In [None]:
gmsh.initialize()
gmsh.model.add('small_vessel')

# Geometry parameters (small for speed)
R_outer = 0.5   # m
L = 1.0         # m
th = 0.05       # m (wall thickness)
R_inner = R_outer - th
lc = 0.15       # coarse element size

# Outer and inner cylinders
outer = gmsh.model.occ.addCylinder(0, 0, 0, 0, 0, L, R_outer)
inner = gmsh.model.occ.addCylinder(0, 0, 0, 0, 0, L, R_inner)
vessel, _ = gmsh.model.occ.cut([(3, outer)], [(3, inner)], removeObject=True, removeTool=True)
gmsh.model.occ.synchronize()

# Tag volume
gmsh.model.addPhysicalGroup(3, [vessel[0][1]], tag=1)
gmsh.model.setPhysicalName(3, 1, 'Vessel')

# Tag inner wall (pressure boundary)
inner_faces = gmsh.model.getBoundary([(3, vessel[0][1])], oriented=False)
inner_surf_ids = [ent[1] for ent in inner_faces]
gmsh.model.addPhysicalGroup(2, inner_surf_ids, tag=2)
gmsh.model.setPhysicalName(2, 2, 'InnerWall')

gmsh.option.setNumber('Mesh.CharacteristicLengthMin', lc)
gmsh.option.setNumber('Mesh.CharacteristicLengthMax', lc)
gmsh.model.mesh.generate(3)
gmsh.write('vessel.msh')
gmsh.finalize()
print('✅ Gmsh mesh written to vessel.msh')


## 2. Convert mesh to XDMF for FEniCS
Short description: use **meshio** to write separate XDMF files for volume and facet markers.

In [None]:
msh = meshio.read('vessel.msh')
points = msh.points
cells_tet = msh.get_cells_type('tetra')
cells_tri = msh.get_cells_type('triangle')

# Volume mesh with tetrahedra
meshio.write('vessel_mesh.xdmf', meshio.Mesh(
    points=points,
    cells={'tetra': cells_tet},
    cell_data={'name_to_read': [msh.cell_data_dict['gmsh:physical']['tetra']]}
))

# Facet markers for boundary conditions
meshio.write('vessel_facets.xdmf', meshio.Mesh(
    points=points,
    cells={'triangle': cells_tri},
    cell_data={'name_to_read': [msh.cell_data_dict['gmsh:physical']['triangle']]}
))
print('✅ XDMF written: vessel_mesh.xdmf, vessel_facets.xdmf')


## 3. FEniCS solver for linear elasticity with internal pressure
Short description: assemble and solve the static elasticity problem with a **clamped end (z=0)** and **pressure on inner wall**.

In [None]:
mesh = Mesh()
with XDMFFile('vessel_mesh.xdmf') as f:
    f.read(mesh)

mvc = MeshValueCollection('size_t', mesh, 2)
with XDMFFile('vessel_facets.xdmf') as f:
    f.read(mvc, 'name_to_read')
facet_markers = cpp.mesh.MeshFunctionSizet(mesh, mvc)

V = VectorFunctionSpace(mesh, 'Lagrange', 1)

def solve_case(E_val, p_val, nu=0.3, inner_marker=2):
    mu = E_val / (2*(1+nu))
    lam = (E_val*nu) / ((1+nu)*(1-2*nu))
    
    def sigma(u):
        return 2*mu*sym(grad(u)) + lam*tr(sym(grad(u)))*Identity(3)
    
    u = TrialFunction(V)
    v = TestFunction(V)
    a = inner(sigma(u), sym(grad(v)))*dx
    
    # Pressure as traction: -p * n on inner surface
    n = FacetNormal(mesh)
    ds = Measure('ds', domain=mesh, subdomain_data=facet_markers)
    L = dot(-Constant(p_val)*n, v)*ds(inner_marker)
    
    # Clamp z=0 end
    class Clamp(SubDomain):
        def inside(self, x, on_boundary):
            return on_boundary and near(x[2], 0.0, DOLFIN_EPS)
    bc = DirichletBC(V, Constant((0,0,0)), Clamp())
    
    u_sol = Function(V)
    solve(a == L, u_sol, bc)
    return u_sol

print('✅ FEniCS solver ready')


## 4. Parametric sweep & snapshot collection
Short description: run a small grid over **E** and **p**, store flattened displacement vectors for POD.

In [None]:
E_vals = np.linspace(1.0e9, 5.0e9, 5)  # Pa (small range for demo)
p_vals = np.linspace(1.0e5, 5.0e5, 5)  # Pa
snapshots = []
param_list = []

for E in E_vals:
    for p in p_vals:
        u_sol = solve_case(E, p)
        snapshots.append(u_sol.vector().get_local())
        param_list.append([E, p])

S = np.array(snapshots).T  # shape: (ndof, N)
params = np.array(param_list)  # shape: (N, 2)
print('Snapshot matrix shape:', S.shape)


## 5. POD basis (SVD) and coefficient extraction
Short description: center snapshots, compute SVD, pick **r** modes, and get POD coefficients.

In [None]:
mean_u = S.mean(axis=1, keepdims=True)
X = S - mean_u
U_svd, sing, VT = np.linalg.svd(X, full_matrices=False)
energy = np.cumsum(sing**2) / np.sum(sing**2)
r = min(10, np.searchsorted(energy, 0.999) + 1)  # keep up to 10 modes or 99.9% energy
Phi = U_svd[:, :r]   # (ndof, r)
A = Phi.T @ X        # (r, N) POD coefficients
print(f'Chosen r={r}; Coeff matrix shape:', A.shape)


## 6. Prepare training data (normalize inputs)
Short description: split into train/test; normalize **(E,p)** for better NN conditioning.

In [None]:
X_in = params.astype(np.float64)
y_out = A.T.astype(np.float64)

X_mean = X_in.mean(axis=0)
X_std = X_in.std(axis=0) + 1e-12
Xn = (X_in - X_mean) / X_std

X_train, X_test, y_train, y_test = train_test_split(Xn, y_out, test_size=0.2, random_state=42)

X_train_t = torch.tensor(X_train, dtype=torch.float32)
y_train_t = torch.tensor(y_train, dtype=torch.float32)
X_test_t  = torch.tensor(X_test, dtype=torch.float32)
y_test_t  = torch.tensor(y_test, dtype=torch.float32)
print('Train set:', X_train_t.shape, y_train_t.shape)


## 7. Define the PINN model (params → POD coefficients)
Short description: small fully-connected NN with Tanh activations.

In [None]:
class POD_PINN(nn.Module):
    def __init__(self, n_in=2, n_out=r, width=32):
        super().__init__()
        self.net = nn.Sequential(
            nn.Linear(n_in, width), nn.Tanh(),
            nn.Linear(width, width), nn.Tanh(),
            nn.Linear(width, n_out)
        )
    def forward(self, x):
        return self.net(x)

model = POD_PINN()
loss_fn = nn.MSELoss()
opt = optim.Adam(model.parameters(), lr=1e-3)
print('✅ PINN model created')


## 8. Full 3D elasticity physics residual for PINN
Short description: compute **div σ(u)** using autograd on a subset of mesh nodes. We reconstruct **u** via POD modes.

**Note:** For production, build a DOF-to-node map from FEniCS to ensure component ordering matches `(ux, uy, uz)` per node.

In [None]:
coords = mesh.coordinates()  # (Nnodes, 3)
Nnodes = coords.shape[0]
ndof = Phi.shape[0]
assert ndof % 3 == 0, 'Expect 3 DOFs per node (vector field).'

# Subsample for efficiency in the demo
idx_sample = np.arange(0, Nnodes, max(1, Nnodes//800))  # ~<= 800 points
coords_sample = coords[idx_sample]
Np = coords_sample.shape[0]

# Build a simple node-wise mode matrix (assumes DOF ordering [ux0, uy0, uz0, ux1, ...])
def build_node_mode_matrix(Phi, node_indices):
    # returns Phi_node of shape (Np*3, r) corresponding to selected nodes
    rows = []
    for n in node_indices:
        rows.extend([3*n + 0, 3*n + 1, 3*n + 2])
    return Phi[rows, :]

Phi_node = build_node_mode_matrix(Phi, idx_sample)  # (Np*3, r)

def physics_residual_loss_elasticity(model, Phi_node, coords_sample, X_batch_t, nu=0.3):
    B = X_batch_t.shape[0]
    coords_t = torch.tensor(coords_sample, dtype=torch.float32).repeat(B, 1, 1)  # [B, Np, 3]
    coords_t.requires_grad_(True)
    
    # Predict POD coeffs
    a_pred = model(X_batch_t)  # [B, r]
    Phi_t = torch.tensor(Phi_node, dtype=torch.float32)  # [Np*3, r]
    disp_flat = torch.einsum('br,nr->bn', a_pred, Phi_t)  # [B, Np*3]
    disp = disp_flat.view(B, -1, 3)  # [B, Np, 3]
    
    # Material from (E,p) -> E only for residual; p handled via traction BC in data
    # We use average E in the batch for mu, lam; for more fidelity, do per-sample loop
    # Here we loop per sample to keep correctness.
    phys_loss = 0.0
    for b in range(B):
        Xb = X_batch_t[b:b+1]
        # de-normalize inputs
        Xb_denorm = Xb.detach().cpu().numpy()*X_std + X_mean
        E_val = float(Xb_denorm[0,0])
        mu = E_val/(2*(1+nu))
        lam = (E_val*nu)/((1+nu)*(1-2*nu))
        
        u = disp[b]  # [Np, 3]
        grads = []
        for i in range(3):
            gi = torch.autograd.grad(u[:, i].sum(), coords_t[b], create_graph=True)[0]  # [Np, 3]
            grads.append(gi)
        grads = torch.stack(grads, dim=2)  # [Np, 3, 3]
        eps = 0.5*(grads + grads.transpose(1, 2))          # [Np, 3, 3]
        trace_eps = torch.einsum('nii->n', eps)             # [Np]
        I = torch.eye(3)
        sigma = lam*trace_eps[:,None,None]*I + 2*mu*eps     # [Np, 3, 3]
        
        # div sigma
        div_sigma = []
        for i in range(3):
            div_i = 0
            for j in range(3):
                grad_sigma_ij = torch.autograd.grad(sigma[:, i, j].sum(), coords_t[b], create_graph=True)[0][:, j]
                div_i = div_i + grad_sigma_ij
            div_sigma.append(div_i)
        div_sigma = torch.stack(div_sigma, dim=1)  # [Np, 3]
        phys_loss = phys_loss + (div_sigma**2).mean()
    
    return phys_loss / B

print('✅ Physics residual ready (full 3D elasticity).')


## 9. Train PINN with combined loss (data + physics)
Short description: optimize the NN to fit POD coeffs **and** minimize PDE residual on sampled points.

In [None]:
alpha_data = 1.0
beta_phys = 1e-3  # increase for stronger physics enforcement
epochs = 400
batch = X_train_t.shape[0]

for ep in range(epochs):
    opt.zero_grad()
    pred = model(X_train_t)
    loss_data = loss_fn(pred, y_train_t)
    loss_phys = physics_residual_loss_elasticity(model, Phi_node, coords_sample, X_train_t)
    loss = alpha_data*loss_data + beta_phys*loss_phys
    loss.backward()
    opt.step()
    if ep % 50 == 0:
        print(f'Epoch {ep}: Data={loss_data.item():.3e}  Phys={loss_phys.item():.3e}  Total={loss.item():.3e}')
print('✅ Training complete')


## 10. Predict on a new point and compare to FEniCS
Short description: reconstruct displacement from predicted POD coeffs, compute **error**, and visualize distributions.

In [None]:
# Choose a test (E, p)
E_test, p_test = 3.0e9, 3.5e5
x_test = np.array([(E_test, p_test)], dtype=np.float64)
x_test_n = (x_test - X_mean) / X_std
x_test_t = torch.tensor(x_test_n, dtype=torch.float32)

# Predict POD coeffs and reconstruct
a_pred = model(x_test_t).detach().numpy().reshape(-1)
u_pred = (Phi @ a_pred + mean_u.reshape(-1))  # (ndof,)

# Ground truth from FEniCS
u_true = solve_case(E_test, p_test).vector().get_local()

# Error metrics
abs_err = np.abs(u_true - u_pred)
rel_err = np.linalg.norm(u_true - u_pred) / (np.linalg.norm(u_true) + 1e-14)
print(f'Relative error: {rel_err:.3e}')

# Histogram of absolute error
plt.figure(figsize=(6,4))
plt.hist(abs_err, bins=30)
plt.xlabel('Absolute displacement error [m]')
plt.ylabel('Count')
plt.title('NN vs. FEniCS Displacement Error (per DOF)')
plt.grid(alpha=0.3)
plt.show()


## 11. 3D scatter of error on nodes
Short description: visualize where the surrogate deviates most (mini-ParaView in notebook).

In [None]:
coords_all = mesh.coordinates()
err_node_mag = np.linalg.norm(abs_err.reshape(-1,3), axis=1)

fig = plt.figure(figsize=(7,6))
ax = fig.add_subplot(111, projection='3d')
pc = ax.scatter(coords_all[:,0], coords_all[:,1], coords_all[:,2], c=err_node_mag, s=10)
ax.set_xlabel('X [m]'); ax.set_ylabel('Y [m]'); ax.set_zlabel('Z [m]')
ax.set_title('Spatial Distribution of Displacement Error (|Δu|)')
fig.colorbar(pc, ax=ax, label='|Δu| [m]')
plt.show()


## 12. Save XDMF fields for ParaView
Short description: write **u_true**, **u_pred**, and scalar **error** to `.xdmf` for side-by-side comparison in ParaView.

In [None]:
def save_vector_field(filename, vec, mesh):
    Vv = VectorFunctionSpace(mesh, 'CG', 1)
    f = Function(Vv)
    f.vector().set_local(vec)
    with XDMFFile(filename) as xdmf:
        xdmf.write(f)

def save_scalar_field(filename, scal, mesh):
    Vs = FunctionSpace(mesh, 'CG', 1)
    f = Function(Vs)
    f.vector().set_local(scal)
    with XDMFFile(filename) as xdmf:
        xdmf.write(f)

# Save vector displacements
save_vector_field('u_true.xdmf', u_true, mesh)
save_vector_field('u_pred.xdmf', u_pred, mesh)

# Save scalar error per DOF and per node magnitude
save_scalar_field('error_per_dof.xdmf', abs_err, mesh)
save_scalar_field('error_node_mag.xdmf', err_node_mag, mesh)
print('✅ Saved: u_true.xdmf, u_pred.xdmf, error_per_dof.xdmf, error_node_mag.xdmf')


### Notes on libraries
- **Gmsh** builds CAD geometry and generates high-quality tetrahedral meshes with boundary tags.
- **meshio** converts between mesh formats (Gmsh `.msh` → FEniCS `.xdmf`).
- **FEniCS (dolfin)** solves PDEs using finite elements. We model linear elasticity and export fields to XDMF.
- **POD** (SVD) reduces dimensionality by extracting dominant modes from snapshots.
- **PINN (Physics-Informed NN)** augments data loss with **PDE residual loss** to enforce physics and improve generalization.

For production, consider: higher mesh resolution, robust DOF-node mapping, stress-based losses, multi-materials, and more advanced NN architectures.