In [None]:
## routine needed to run the notebook on Google Colab
try:
    import google.colab
    IN_COLAB = True
except:
    IN_COLAB = False
if IN_COLAB:
    !pip install "pina-mathlab[tutorial]"
    !pip install "ase"
    !mkdir "data"
    !wget "https://github.com/dario-coscia/KTH-Summer-School-PINNs-PINA/raw/refs/heads/main/data/ammonia.xyz" -O "data/ammonia.xyz"
    !wget "https://github.com/mathLab/PINA/raw/refs/heads/master/tutorials/tutorial7/data/pinn_solution_0.5_0.5" -O "data/pinn_solution_0.5_0.5"
    !wget "https://github.com/mathLab/PINA/raw/refs/heads/master/tutorials/tutorial7/data/pts_0.5_0.5" -O "data/pts_0.5_0.5"
    !mkdir "benchmark_ckpt"
    !wget "https://github.com/dario-coscia/KTH-Summer-School-PINNs-PINA/raw/refs/heads/main/benchmark_ckpt/PINN.ckpt" -O "benchmark_ckpt/PINN.ckpt"
    !wget "https://github.com/dario-coscia/KTH-Summer-School-PINNs-PINA/raw/refs/heads/main/benchmark_ckpt/GradientPINN.ckpt" -O "benchmark_ckpt/GradientPINN.ckpt"
    !wget "https://github.com/dario-coscia/KTH-Summer-School-PINNs-PINA/raw/refs/heads/main/benchmark_ckpt/RBAPINN.ckpt" -O "benchmark_ckpt/RBAPINN.ckpt"


import warnings
import random
import torch
import torch.nn as nn
import matplotlib.pyplot as plt
from matplotlib import cm

from lightning.pytorch.callbacks import ModelCheckpoint, Callback
from lightning.pytorch.loggers import TensorBoardLogger

from ase.io import read

from torch_geometric.utils import scatter

from pina import Condition, LabelTensor, Trainer
from pina.condition import DataCondition
from pina.problem import (AbstractProblem, TimeDependentProblem, SpatialProblem,
                          InverseProblem)
from pina.domain import CartesianDomain
from pina.graph import Graph, LabelBatch
from pina.solver import (SingleSolverInterface, SupervisedSolverInterface,
                         SupervisedSolver, PINN)
from pina.model import FeedForward
from pina.model.block import PODBlock
from pina.operator import grad, laplacian
from pina.model.block.message_passing import DeepTensorNetworkBlock
from pina.equation import Equation, FixedValue, FixedGradient
from pina.solver import PINN, GradientPINN, RBAPINN
from pina.optim import TorchOptimizer
from pina.loss import LpLoss, LossInterface

warnings.filterwarnings("ignore")


# PINA Library for Scientific Machine Learning

[![Open In Colab](https://colab.research.google.com/assets/colab-badge.svg)](https://colab.research.google.com/github/dario-coscia/Lecture-PINA/blob/main/tutorial.ipynb)


<p align="left">
    <img src="https://raw.githubusercontent.com/mathLab/PINA/master/readme/pina_logo.png" alt="PINA logo" width="90"/>
</p>


**PINA** [1] is an open-source Python library designed for **Scientific Machine Learning (SciML)** tasks, particularly involving:

- **Physics-Informed Neural Networks (PINNs)**
- **Neural Operators (NOs)**
- **Reduced Order Models (ROMs)**
- **Generative Models (GMs)**
- ...

Built on **PyTorch**, **PyTorch Lightning**, and **PyTorch Geometric**, it provides a **user-friendly, intuitive interface** for formulating and solving differential problems using neural networks.


## üëâ Lecture Outline

1. **Introduction to the Software**
    - How to build a SciML problem in PINA
    - Data Structures and Domains
    - Solvers and Trainer API     

2. **Supervised Learning**
    - Machine Learning Interatomic Potentials (*Ammonia NH3*)
    - Reduced Order Model learning (*Navier Stokes back-step*)

3. **Physics Informed Learning**
    - Train Physics Informed Networks with PINA (*Harmonic Oscillator*)
    - Benchmarking PINN variants (*Helmholtz Equation*)
    - Solving Inverse Problems (*Poisson Equation*)

> üôÅ We will not cover Neural Operators, but feel free to check the [tutorials](https://github.com/mathLab/PINA/tree/master/tutorials#neural-operator-learning) on the PINA repository.

## üëâ Introduction to the Software

<p align="center">
    <img src="http://raw.githubusercontent.com/mathLab/PINA/master/tutorials/static/pina_workflow.png" alt="PINA Workflow" width="1000"/>
</p>


**PINA** is an open-source Python library designed to simplify and accelerate the development of SciML solutions. Built on top of [PyTorch](https://pytorch.org/), [PyTorch Lightning](https://lightning.ai/docs/pytorch/stable/), and [PyTorch Geometric](https://pytorch-geometric.readthedocs.io/en/latest/), PINA provides an intuitive framework based of four main steps:

1. ***Problem & Data***
   Define the mathematical problem and its physical constraints +  Prepare inputs by discretizing the domain or importing numerical data. 

2. ***Model Design***  
   Build neural network models as **PyTorch modules**. For graph-structured data, use **PyTorch Geometric** to build Graph Neural Networks. You can also import models from `pina.model` module!

3. ***Solver Selection***  
   Choose and configure a solver to optimize your model, e.g. `SupervisedSolver`, `ReducedOrderModelSolver`, `PINN` and more...

4. ***Training***  
   Train your model using the `Trainer` class (built on **PyTorch Lightning**), which enables scalable and efficient training with advanced features.

Let's see more in details all the steps!

### 1. Problem & Data
In **PINA**, defining a problem is done by creating a Python `class` that inherits from one or more problem classes, such as `SpatialProblem`, `TimeDependentProblem`, or `ParametricProblem`, depending on the nature of the problem. A simple example is the following:

In [None]:
# creating some fictitious data
input_1 = LabelTensor(torch.randn(10, 1), "x")  # <== input_variables
input_2 = LabelTensor(torch.randn(10, 1), "x")  # <== input_variables
target_1 = LabelTensor(torch.randn(10, 1), "y")  # <== output_variables
target_2 = LabelTensor(torch.randn(10, 1), "y")  # <== output_variables


class MySupervisedProblem(AbstractProblem):

    input_variables = ["x"]
    output_variables = ["y"]

    conditions = {
        "condition_1": Condition(input=input_1, target=target_1),
        "condition_2": Condition(input=input_2, target=target_2),
    }


problem = MySupervisedProblem()

We highlight two very important features of PINA

1. **`Condition` Object**  
   - The `Condition` object enforces the **constraints** that the model must satisfy, such as boundary or initial conditions. More conditions will come in the tutorial!
   - It ensures that the model adheres to the specific requirements of the problem, making constraint handling more intuitive and streamlined.

2. **`LabelTensor` Structure**  
   - Alongside the standard `torch.Tensor`, PINA introduces the `LabelTensor` structure, which allows **string-based indexing**.  
   - Ideal for managing and stacking tensors with different labels (e.g., `"x"`, `"t"`, `"u"`) for improved clarity and organization.  
   - You can still use standard PyTorch tensors if needed.

In [None]:
# Create a label tensor containing spatial and temporal coordinates
x = torch.tensor([0.0, 1.0, 2.0, 3.0])  # Spatial coordinates
t = torch.tensor([0.0, 0.5, 1.0, 1.5])  # Time coordinates

# Combine x and t into a label tensor (2D tensor)
tensor = torch.stack([x, t], dim=-1)  # Shape: [4, 2]
print("Tensor:\n", tensor)

# Build the LabelTensor
label_tensor = LabelTensor(tensor, ["x", "t"])

print(f"Torch methods can be used, {label_tensor.shape=}")
print(f"also {label_tensor.requires_grad=} \n")
print(f'We can slice with labels: \n {label_tensor["x"]=}')
print(f"Similarly to: \n {label_tensor[:, 0]=}")

You can do more complex slicing by using the extract method. For example:

In [None]:
label_tensor = LabelTensor(
    tensor,
    {
        0: {"dof": range(4), "name": "points"},
        1: {"dof": ["x", "t"], "name": "coords"},
    },
)

print(f'Extract labels: {label_tensor.extract({"points" : [0, 2]})=}')
print(f"Similar to: {label_tensor[slice(0, 4, 2), :]=}")

Similarly, you can also work with graphs!

<p align="center">
    <img src="https://raw.githubusercontent.com/dario-coscia/KTH-Summer-School-PINNs-PINA/main/imgs/simple_graph.png" alt="Graph" width="500"/>
</p>

In [None]:
# Node features: [2 nodes, 3 features]
x = torch.randn(2, 3)
# Edge indices: representing a graph with two edges (node 0 to node 1, node 1 to node 0)
edge_index = torch.tensor([[0, 1], [1, 0]], dtype=torch.long)
# Create a PINA graph object (similar to PyG)
data = Graph(x=x, edge_index=edge_index)
output = torch.randn(10)


# create problem
class MySupervisedProblem(AbstractProblem):
    input_variables = None
    output_variables = None
    conditions = {"condition": Condition(input=data, target=output)}


problem = MySupervisedProblem()

### 2. Model Design

In PINA we refer to the `model` as the object that solves the problem, e.g., a **Neural Network**. You can implement any neural network using the PyTorch `nn.Module`:

In [None]:
class SimpleNet(nn.Module):
    def __init__(self):
        super().__init__()  # <=== always remember this
        self.complex_net = nn.Sequential(
            nn.Linear(10, 20), nn.ReLU(), nn.Linear(20, 10)
        )

    def forward(self, x):
        return self.complex_net(x)


model = SimpleNet()

print(model)

or use the PINA built in models in the `pina.model` module, which you can explore further [here](https://mathlab.github.io/PINA/_rst/_code.html#models).

In [None]:
model = FeedForward(
    input_dimensions=10,
    output_dimensions=10,
    inner_size=20,
    n_layers=1,
    func=nn.ReLU,
)

print(model)

or you can use building blocks from `pina.model.block` to assemble Neural Network layers and build a model ([more in here](https://mathlab.github.io/PINA/_rst/_code.html#blocks))

> ‚ö†Ô∏è *Also for message passing networks we have built in blocks, but in `pina.model.messagepassing`*

In [None]:
from pina.model.block import ResidualBlock

model = nn.Sequential(
    ResidualBlock(input_dim=10, output_dim=20, hidden_dim=20),
    nn.ReLU(),
    ResidualBlock(input_dim=20, output_dim=20, hidden_dim=10),
)

print(model)

### 3. Solver Selection

[`Solvers`](https://mathlab.github.io/PINA/_rst/_code.html#solvers) are versatile objects in PINA designed to manage the training and optimization of machine learning models. They are built on top of the [PyTorch Lightning `LightningModule`](https://lightning.ai/docs/pytorch/stable/common/lightning_module.html), and handle key components of the learning process, including:

- Loss function minimization  
- Model optimization (optimizer, schedulers)
- Validation and testing workflows

**Solvers in PINA are modular and hierarchical!** 

| Feature / Argument | [`SingleSolverInterface`](https://mathlab.github.io/PINA/_rst/solver/solver_interface.html) | [`MultiSolverInterface`](https://mathlab.github.io/PINA/_rst/solver/multi_solver_interface.html) |
|--------------------|----------------------------------------------------------|------------------------------------------------------------|
| Purpose        | Solvers for **a single model** (e.g., supervised learning, PINNs) | Solvers for **multiple models** (e.g., GANs, ensembles)     |
| Inheriting Classes | `SupervisedSolver`, `PINN`, etc.                        | `DeepEnsemblePINN`, `GAROM`, etc.                           |
| problem        | ‚úÖ Required                                                | ‚úÖ Required                                                  |
| model(s)       | `model` ‚Äì Single model                                    | `models` ‚Äì One or more models                               |
| optimizer(s)   | `optimizer` ‚Äì Defaults to `torch.optim.Adam`              | `optimizers` ‚Äì Defaults to `torch.optim.Adam`               |
| scheduler(s)   | `scheduler` ‚Äì Defaults to `torch.optim.lr_scheduler.ConstantLR` | `schedulers` ‚Äì Defaults to `torch.optim.lr_scheduler.ConstantLR` |
| weighting(s)   | Optional, [see docs](https://mathlab.github.io/PINA/_rst/_code.html#losses-and-weightings) | Optional, [see docs](https://mathlab.github.io/PINA/_rst/_code.html#losses-and-weightings) |
| **use_lt**         | Whether to use LabelTensors as input                      | Whether to use LabelTensors as input                        |


**These classes are used to define the backbone**, i.e. setting the problem, the model(s), the optimizer(s) and scheduler(s), but miss a key component the `optimization_cycle` method. The **`optimization_cycle`** method is the core function responsible for computing losses for **all conditions** in a given training batch. Each condition (e.g. initial condition, boundary condition, PDE residual) contributes its own loss, which is tracked and returned in a dictionary. This method should return a dictionary mapping **condition names** to their respective **scalar loss values**.

For supervised learning tasks, where each condition consists of an input-target pair, for example, the `optimization_cycle` may look like this:

```python
def optimization_cycle(self, batch):
    """
    The optimization cycle for Supervised solvers.
    Computes loss for each condition in the batch.
    """
    condition_loss = {}
    for condition_name, data in batch:
        condition_loss[condition_name] = self.loss_data(
            input=data["input"], target=data["target"]
        )
    return condition_loss
```
In PINA, a **batch** is structured as a list of tuples, where each tuple corresponds to a specific training condition. Each tuple contains:

- The **name of the condition**
- A **dictionary of data** associated with that condition

for example:

```python
batch = [
    ("condition1", {"input": ..., "target": ...}),
    ("condition2", {"input": ..., "equation": ...}),
    ("condition3", {"input": ..., "target": ...}),
]
```

Fortunately, you don't need to implement the `optimization_cycle` yourself in most cases ‚Äî PINA already provides default implementations tailored to common solver types. These implementations are available through the solver interfaces and cover various training strategies.

1. [`PINNInterface`](https://mathlab.github.io/PINA/_rst/solver/physics_informed_solver/pinn_interface.html)  
   Implements the optimization cycle for **physics-based solvers** (e.g., PDE residual minimization) as well as other useful methods to compute PDE residuals.  
   ‚û§ [View method](https://mathlab.github.io/PINA/_rst/solver/physics_informed_solver/pinn_interface.html#pina.solver.physics_informed_solver.pinn_interface.PINNInterface.optimization_cycle)

2. [`SupervisedSolverInterface`](https://mathlab.github.io/PINA/_rst/solver/supervised_solver/supervised_solver_interface.html)  
   Defines the optimization cycle for **supervised learning tasks**, including traditional regression and classification.  
   ‚û§ [View method](https://mathlab.github.io/PINA/_rst/solver/supervised_solver/supervised_solver_interface.html#pina.solver.supervised_solver.supervised_solver_interface.SupervisedSolverInterface.optimization_cycle)

3. [`DeepEnsembleSolverInterface`](https://mathlab.github.io/PINA/_rst/solver/ensemble_solver/ensemble_solver_interface.html)  
   Provides the optimization logic for **deep ensemble methods**, commonly used for uncertainty quantification or robustness.  
   ‚û§ [View method](https://mathlab.github.io/PINA/_rst/solver/ensemble_solver/ensemble_solver_interface.html#pina.solver.ensemble_solver.ensemble_solver_interface.DeepEnsembleSolverInterface.optimization_cycle)

In [None]:
class MyFirstSolver(SupervisedSolverInterface, SingleSolverInterface):

    # loss data is responsible for "data driven loss"
    # for physics based learning you need to implement
    # loss_phys for building custom physics informed solvers
    def loss_data(self, input, target):
        network_output = self.forward(input)
        return self.loss(network_output, target)


# initialize (we use plain tensors!)
solver = MyFirstSolver(problem=MySupervisedProblem(), model=SimpleNet(), use_lt=False)

#### 4. Training

Finally the `Trainer` class is used to train the model. The `Trainer` class, based on **PyTorch Lightning**, offers many features that help:
- **Improve model accuracy**
- **Reduce training time and memory usage**
- **Facilitate logging and visualization**  

The great work done by the PyTorch Lightning team ensures a streamlined training process, we will see more in details some features later!


## üëâ Supervised Learning

**Supervised learning** is a machine learning paradigm where a model $\mathcal{M}_{\theta}$ is trained on a **labeled** dataset. In this setting, the goal is to learn a mapping from inputs $x$ to outputs $y$, using a dataset of known input-output pairs $\{x_i, y_i\}_{i=1}^N$. The model adjusts its internal parameters to minimize the discrepancy between its predictions and the actual target values, quantified by a loss function: $$\mathcal{L}=\frac{1}{N}\sum_{i=1}^N l(y_i, \mathcal{M}_{\theta}(x_i)).$$

We will now show how to apply what you have learned so far for two problems:

2. Machine Learned Interatomic Potential (*Ammonia System*)
2. Reduced Order Model learning (*Navier Stokes backstep*)

### Machine Learning Interatomic Potentials

Machine Learning Interatomic Potentials (MLIPs) [7] are models that learn to approximate the **potential energy surface (PES)** of atomic systems using data from quantum mechanical simulations.

The core idea is to **regress the total energy** $E$ of a system given atomic positions $\boldsymbol{r}\in\mathbb{R}^3$ and atomic numbers $Z\in\mathbb{R}$ ‚Äî and then obtain the **atomic forces** $F$ as the **negative gradient of the predicted energy** with respect to atomic positions:

$$
\mathbf{F}_i = -\nabla_{\mathbf{r}_i} E(\{\mathbf{r}_j\})
$$

This makes the problem naturally **supervised**:  
We train a model to predict both:

- **Energies**: scalar value per structure  
- **Forces**: vector (3D) per atom, derived from energy via automatic differentiation

These models are used in **molecular dynamics**, **geometry optimization**, and **materials discovery**, where accurate yet fast energy and force evaluations are critical. We first divide in train and test data, we will use the `Data` structure as it naturally handles graphs.

In [None]:
# Read trajectory from .xyz
atoms_list = read("data/ammonia.xyz", index=":")  # list of ase.Atoms
random.shuffle(atoms_list)

# Divide train and test
train_atoms = atoms_list[:140]
test_atoms = atoms_list[140:]

# Build input_data and output_data for training
input_data = []
output_data = []
for atoms in train_atoms:
    pos = LabelTensor(
        torch.tensor(atoms.get_positions(), dtype=torch.float32),
        ["x", "y", "z"],
    )
    z = torch.tensor(atoms.get_atomic_numbers(), dtype=torch.long)
    energy = torch.tensor([atoms.get_potential_energy()], dtype=torch.float32)
    forces = torch.tensor(atoms.get_forces(), dtype=torch.float32)
    input_data.append(Graph(pos=pos, z=z))
    output_data.append(Graph(energy=energy, forces=forces))


# general supervised problems, work for any data-structure: Data, LabelTensor,...
class SupervisedProblem(AbstractProblem):
    conditions = {}
    output_variables = None
    input_variables = None

    def __init__(self, input_, output_, input_variables=None, output_variables=None):
        # Set input and output variables
        self.input_variables = input_variables
        self.output_variables = output_variables
        # Set the condition
        self.conditions["data"] = Condition(input=input_, target=output_)
        super().__init__()


# Create the problem (we save energy meand and std for later)
energy_mean = torch.stack([d.energy for d in output_data]).mean()
energy_std = torch.stack([d.energy for d in output_data]).std()
problem = SupervisedProblem(input_data, output_data)

We also build similary the test data

In [None]:
# Building input_data and output_data for testing
input_batch = LabelBatch.from_data_list(
    [
        Graph(
            pos=LabelTensor(
                torch.tensor(atoms.get_positions(), dtype=torch.float32),
                ["x", "y", "z"],
            ),
            z=torch.tensor(atoms.get_atomic_numbers(), dtype=torch.long),
        )
        for atoms in test_atoms
    ],
)
true_energies = torch.stack(
    [
        torch.tensor(atoms.get_potential_energy(), dtype=torch.float32)
        for atoms in test_atoms
    ]
).flatten()
true_forces = torch.stack(
    [torch.tensor(atoms.get_forces(), dtype=torch.float32) for atoms in test_atoms]
).flatten()

We will use **Deep Tensor Graph Neural Networks** (DT-GNNs), a class of models designed to respect the physical symmetries of the problem. In particular, these networks are **invariant under translations and rotations**, which is essential when modeling atomic systems where absolute positions should not affect the predictions.

The message-passing update for each atom $i$ at layer $l+1$ takes the form:

$$
h_i^{(l+1)} = h_i^{(l)} + \sum_{j \ne i} \gamma\left( \phi(h_i^{(l)}) \cdot \psi(\| \mathbf{r}_{ij} \|) \right)
$$

- $h_i^{(l)}$: feature vector of atom $i$ at layer $l$ (for first layer we embed $Z$)  
- $\| \mathbf{r}_{ij} \|$: distance between atoms $i$ and $j$.  
- $\phi, \psi, \gamma$: learnable neural network functions.


In [None]:
class ForceField(nn.Module):

    def __init__(self, energy_mean, energy_std, cutoff=5):
        super().__init__()
        self.mean = energy_mean
        self.std = energy_std
        self.cutoff = cutoff
        self.embedding = nn.Embedding(100, 64)
        self.linear = nn.Linear(1, 64)
        self.gnn1 = DeepTensorNetworkBlock(64, 64)
        self.gnn2 = DeepTensorNetworkBlock(64, 64)
        self.readout = nn.Sequential(nn.Linear(64, 64), nn.SiLU(), nn.Linear(64, 1))

    @torch.set_grad_enabled(
        True
    )  # we set it to true because we need to compute gradients of forces
    def forward(self, data):
        pos, z, batch = data.pos, data.z, data.batch
        pos = pos.requires_grad_()
        edge_index, dist = self.build_edges(pos, batch)
        h = self.embedding(z)
        edge_attr = self.linear(dist)
        # egnn
        h = self.gnn1(h, edge_index, edge_attr)
        h = self.gnn2(h, edge_index, edge_attr)
        # redout
        r = self.readout(h)
        # compute energy
        energy = scatter(r, batch, dim=0, reduce="sum") * self.std + self.mean
        predicted_energy = LabelTensor(energy, "E")
        # compute forces
        predicted_forces = -grad(
            predicted_energy, pos, d=["x", "y", "z"], components=["E"]
        )
        return predicted_energy.tensor, predicted_forces.tensor  # return tensors

    def build_edges(self, pos, batch):
        edge_index_list, edge_attr_list = [], []
        for i in batch.unique():
            idx = (batch == i).nonzero(as_tuple=True)[0]
            pos_i = pos[idx].tensor
            rel = pos_i[:, None, :] - pos_i[None, :, :]
            dist = torch.norm(rel, dim=-1)
            mask = (dist < self.cutoff) & (dist > 0)
            row, col = mask.nonzero(as_tuple=True)
            edge_index = torch.stack([idx[row], idx[col]], dim=0)
            edge_attr = dist[mask].unsqueeze(1)
            edge_index_list.append(edge_index)
            edge_attr_list.append(edge_attr)
        return torch.cat(edge_index_list, dim=1), torch.cat(edge_attr_list, dim=0)

Once we have built the network, we will define a **custom loss** for the solver to train it. To do this, we will use the `LossInterface` class. These interfaces allow us to customize the loss function while benefiting from existing infrastructure.


In [None]:
class ForceFieldLoss(LossInterface):
    def forward(self, input, target):
        predicted_energy, predicted_forces = input
        target_energy, target_forces = target.energy, target.forces
        energy_loss = torch.abs(target_energy - predicted_energy).mean()
        forces_loss = torch.abs(target_forces - predicted_forces).mean()
        return energy_loss + forces_loss

We can now train!

In [None]:
model = ForceField(energy_mean=energy_mean, energy_std=energy_std)
solver = SupervisedSolver(problem, model, loss=ForceFieldLoss(), use_lt=False)
trainer = Trainer(
    solver=solver,
    max_epochs=300,
    batch_size=32,
    accelerator="cpu",
    train_size=0.8,
    val_size=0.2,
)
trainer.train()

Finally we can plot the prediction on the test dataset:

In [None]:
predicted_energies, predicted_forces = solver(input_batch)

# Ensure tensors are on CPU and detached from graph
pred_energies = predicted_energies.flatten().detach()
pred_forces = predicted_forces.flatten().detach()

# Create figure
fig, axs = plt.subplots(1, 2, figsize=(12, 5))

# --- Energy plot ---
axs[0].scatter(pred_energies, true_energies, alpha=0.6)
axs[0].plot(
    [true_energies.min(), true_energies.max()],
    [true_energies.min(), true_energies.max()],
    "r--",
    label="Ideal",
)
axs[0].set_xlabel("Predicted Energy")
axs[0].set_ylabel("True Energy")
axs[0].set_title("Energy Prediction")
axs[0].legend()

# --- Force plot ---
axs[1].scatter(pred_forces, true_forces, alpha=0.3, s=10)
axs[1].plot(
    [true_forces.min(), true_forces.max()],
    [true_forces.min(), true_forces.max()],
    "r--",
    label="Ideal",
)
axs[1].set_xlabel("Predicted Force Component")
axs[1].set_ylabel("True Force Component")
axs[1].set_title("Force Prediction")
axs[1].legend()

plt.tight_layout()
plt.show()

### Reduced Order Modelling

Reduced Order Modeling (ROM) techniques aim to approximate the solution of complex differential equations‚Äîparticularly **parametric partial differential equations (PDEs)**‚Äîin a fast and efficient way, often enabling **real-time simulation** [2].

In this session, we will explore a popular ROM approach known as **POD-NN**, which combines:

- **Proper Orthogonal Decomposition (POD)**: for reducing the dimensionality of the solution space (i.e., extracting dominant modes or features), and  
- **Neural Networks (NN)**: for learning a nonlinear mapping from parameters to the reduced solution space [3].

Specifically, we will:
1. Use POD to construct a low-dimensional basis from high-fidelity solution snapshots.
2. Train a simple **Multilayer Perceptron (MLP)** to predict the reduced coefficients given new parameter inputs.
3. Reconstruct the full solution using the reduced basis and predicted coefficients.

> ‚ÑπÔ∏è Although we use an MLP here for simplicity, the approach is flexible and can accommodate other machine learning models as well.

Let‚Äôs dive in!

We utilize the [Smithers](https://github.com/mathLab/Smithers) library to gather the parametric snapshots (more on the dataset specific can be found in the library). Specifically, we use the `NavierStokesDataset` class, which contains a collection of parametric solutions to the Navier-Stokes equations in a 2D L-shaped domain. The parameter in this case is the inflow velocity.

The dataset comprises 500 snapshots of the velocity fields (along the $x$, $y$ axes, and the magnitude), as well as the pressure fields, along with their corresponding parameter values.

To visually inspect the snapshots, let's also plot the data points alongside the reference solution. This reference solution represents the expected output of our model.

In [None]:
from smithers.dataset import NavierStokesDataset

dataset = NavierStokesDataset()

fig, axs = plt.subplots(1, 4, figsize=(14, 3))
for ax, p, u in zip(axs, dataset.params[:4], dataset.snapshots["mag(v)"][:4]):
    ax.tricontourf(dataset.triang, u, levels=80)
    ax.set_title(rf"$\mu$ = {p[0]:.2f}")

We aim to regress the magnitude of the velocity field $\|\boldsymbol{v}\|$ for different values of the inlet velocity $\mu$:

In [None]:
# building the problem
u = torch.tensor(dataset.snapshots["mag(v)"], dtype=torch.float32)
p = torch.tensor(dataset.params, dtype=torch.float32)

problem = SupervisedProblem(p, u)

We will now build the model. We highlight that the POD modes are directly computed by means of the singular value decomposition (SVD) over the input data, and not trained using the backpropagation approach. Only the weights of the regressor (MLP) are actually trained during the optimization loop.

In [None]:
# build the model
class PODNN(torch.nn.Module):
    def __init__(
        self, in_feautures, pod_rank, num_layers=2, hidden_dim=64, func=nn.SiLU
    ):
        super().__init__()
        self.pod = PODBlock(pod_rank)
        self.nn = FeedForward(
            input_dimensions=in_feautures,
            output_dimensions=pod_rank,
            n_layers=num_layers,
            inner_size=hidden_dim,
            func=func,
        )

    def forward(self, x):
        coefficents = self.nn(x)  # 1. compute reduced coefficient
        return self.pod.expand(coefficents)  # 2. expand the POD basis

    def fit_pod(self, x):
        self.pod.fit(x)


pod_nn_model = PODNN(in_feautures=1, pod_rank=20)

# build the solver
solver = SupervisedSolver(problem, pod_nn_model, use_lt=False)

Before starting, we need to fit the POD basis on the training dataset. This can be easily done in **PINA** as well:

In [None]:
trainer = Trainer(
    solver=solver,
    max_epochs=500,
    batch_size=None,
    accelerator="cpu",
    train_size=0.8,
    val_size=0.1,
    test_size=0.1,
)

# fit the pod basis
trainer.data_module.setup("fit")  # set up the dataset
train_data = trainer.data_module.train_dataset.get_all_data()
x_train = train_data["data"]["target"]  # extract data for training
pod_nn_model.fit_pod(x=x_train)

# now train
trainer.train()

Done! Now that the computationally expensive part is over, we can load the model in the future to infer new parameters (simply by loading the checkpoint file automatically created by `Lightning`, see the directory `lightning_logs/...`) or test its performances. We measure the relative error for both the training and test datasets, printing the mean error.

In [None]:
# extract train and test data
trainer.data_module.setup("test")  # set up the dataset
p_train = trainer.data_module.train_dataset.conditions_dict["data"]["input"]
u_train = trainer.data_module.train_dataset.conditions_dict["data"]["target"]
p_test = trainer.data_module.test_dataset.conditions_dict["data"]["input"]
u_test = trainer.data_module.test_dataset.conditions_dict["data"]["target"]

# compute statistics
u_test_nn = solver(p_test)
u_train_nn = solver(p_train)

relative_error_train = torch.norm(u_train_nn - u_train) / torch.norm(u_train)
relative_error_test = torch.norm(u_test_nn - u_test) / torch.norm(u_test)

fig, axs = plt.subplots(1, 3, figsize=(12, 4))
param_idx = 0
axs[0].set_title(f"Real Snapshots")
axs[1].set_title(f"POD-NN")
axs[2].set_title(f"Error POD-NN")
cm = axs[0].tricontourf(dataset.triang, u_test[param_idx].detach(), levels=80)
plt.colorbar(cm, ax=axs[0])
cm = axs[1].tricontourf(dataset.triang, u_test_nn[param_idx].detach(), levels=80)
plt.colorbar(cm, ax=axs[1])
cm = axs[2].tricontourf(
    dataset.triang,
    (u_test_nn[param_idx] - u_test[param_idx]).abs().detach(),
    levels=80,
)
plt.colorbar(cm, ax=axs[2])
plt.show()

print("Error summary for POD-NN model:")
print(f"  Train: {relative_error_train.item():.4%}")
print(f"  Test:  {relative_error_test.item():.4%}")

## üëâ Physics Informed Learning

**Physics Informed Learning** [4] is a machine learning paradigm where a model $\mathcal{M}_{\theta}$ is trained not only on data but also by incorporating known physical laws, typically expressed as differential equations. In this framework, the loss function is augmented with terms derived from physical constraints, such as:

- Residuals of partial differential equations (PDEs)
- Boundary or initial conditions
- Conservation laws (e.g., mass, energy, momentum)

For example, in **Physics-Informed Neural Networks (PINNs)**, the total loss $\mathcal{L}$ typically includes:

$$
\mathcal{L} = \lambda_{\text{data}} \mathcal{L}_{\text{data}} + \lambda_{\text{physics}} \mathcal{L}_{\text{physics}}
$$

where:

- $\mathcal{L}_{\text{data}}$ measures the error on observed data,
- $\mathcal{L}_{\text{physics}}$ penalizes violation of physical laws,
- $\lambda_{\text{physics}, \lambda_{\text{data}}}$ are a weighting coefficients.

We will now show how to apply PINA for PINNs problem, specifically:

1. Train Physics Informed Networks with PINA (*Harmonic Oscillator*)
2. Benchmarking PINN variants (*Advection Equation*)
3. Solving Inverse Problems (*Poisson Equation*)

### Solving the Harmonic Oscillator with PINNs

The harmonic oscillator can be described by a simple ODE:

$$
\begin{equation}
\begin{cases}
&\frac{d^2}{dt^2}u(t) +  u(t) = 0 \quad t\in(0,2\pi)\\
&u(t=0) = 0 \\
&u'(t=0) = 1 \\
\end{cases}
\end{equation}
$$

Although this is a relatively simple problem, we will use it to demonstrate the full pipeline for training a **Physics-Informed Neural Network (PINN)** from scratch using the **PINA** library following the standard 4 steps pipeline.

In [None]:
# define the equation as a simple python function
def ode_equation(input_, output_):
    u_tt = laplacian(output_, input_, components=["u"], d=["t"])
    u = output_.extract(["u"])
    return u_tt + u


class HarmonicOscillator(TimeDependentProblem):

    # input_variables directly inferred from temporal_domain
    output_variables = ["u"]
    temporal_domain = CartesianDomain({"t": [0, 2 * torch.pi]})
    domains = {
        "initial": CartesianDomain({"t": 0.0}),
        "pde": temporal_domain,
    }

    # one condition for each equation
    conditions = {
        "initial": Condition(domain="initial", equation=FixedValue(0.0)),
        "initial_vel": Condition(
            domain="initial", equation=FixedGradient(1.0, "u", "t")
        ),
        "pde": Condition(domain="pde", equation=Equation(ode_equation)),
        # you can even pass your points!
    }

    def solution(self, pts):
        return torch.sin(pts.extract(["t"]))


harmonic_oscillator_problem = HarmonicOscillator()

Differently from data-driven problems, differential-problems need to specify the domain type. If you look at our ODE definition, the temporal varibale $t$ is defined in the interval $(0,2\pi)$, and accordingly the `temporal_domain` is a `CartesianDomain` (segment) with the input variable `t` in `[0,2*torch.pi]`. To know more about the Domain class see the [related tutorial](https://mathlab.github.io/PINA/tutorial6/tutorial.html). Different problems require different domain, here below we summarize the relevant ones:

| Problem Type            | Required Domain                |
|-------------------------|--------------------------------|
| `SpatialProblem`        | `spatial_domain`              |
| `TimeDependentProblem`  | `temporal_domain`             |
| `ParametricProblem`     | `parameter_domain`            |
| `InverseProblem`        | `unknown_parameter_domain`    |


As you can see, we implemented the `ode_equation` function which given the model ouput and input returns the equation residual. These residuals are the ones minimized during PINN optimization. 

**How are the residuals computed?**
Given the output we perform differential operation using the [operator modulus](https://mathlab.github.io/PINA/_rst/operator.html). It is pretty intuitive, each differential operator takes the following inputs: 
- A tensor on which the operator is applied. 
- A tensor with respect to which the operator is computed. 
- The names of the output variables for which the operator is evaluated. 
- The names of the variables with respect to which the operator is computed.
We also have a `fast` version of differential operators, where no checks are performed. This can be used to boost performances, once you know the standard ones are doing their job. 

Notice that we do not pass directly a `python` function, but an `Equation` object, which is initialized with the `python` function. This is done so that all the computations and internal checks are done inside **PINA**, see [the related tutorials](https://mathlab.github.io/PINA/tutorial12/tutorial.html) for more.

#### Generate data for Physical Problems

When training physics based models, data can come in form of direct numerical simulation results (tensors, graph), or points in the domains which need to be sampled. In case we perform unsupervised learning, we just need the collocation points for training, i.e. points where we want to evaluate the neural network. Sampling point in **PINA** is very easy. But first, let's check if the domains are dicsretized by using the `are_all_domains_discretised` method.

In [None]:
harmonic_oscillator_problem.are_all_domains_discretised

This is false becase the input points are not available (we need to discretize!). To discretise the problem you can use the `discretise_domain` method:

In [None]:
# sampling 20 points in [0, 1] through discretization in all locations
harmonic_oscillator_problem.discretise_domain(n=20, mode="grid", domains="all")

# sampling 20 points in (0, 1) through latin hypercube sampling in D, and 1 point in x0
harmonic_oscillator_problem.discretise_domain(n=20, mode="grid", domains=["pde"])
harmonic_oscillator_problem.discretise_domain(n=1, mode="random", domains=["initial"])

The points are saved in a python `dict`, and can be accessed by calling the attributes input_pts` or `discretised_domains` of the problem.

In [None]:
print("Input points:", harmonic_oscillator_problem.input_pts)
for location in harmonic_oscillator_problem.input_pts:
    coords = harmonic_oscillator_problem.input_pts[location].flatten()
    plt.scatter(coords, torch.zeros_like(coords), s=10, label=location)
_ = plt.legend()

Once the problem is setup, training can be done in less then ten lines:

In [None]:
# build the model (2 layers by 64 with GELU activation, default)
model = FeedForward(
    input_dimensions=1, output_dimensions=1, layers=[64, 64], func=nn.GELU
)
# build the solver
pinn = PINN(harmonic_oscillator_problem, model)
# build the trainer and train
trainer = Trainer(pinn, max_epochs=1000, accelerator="cpu")
trainer.train()

In [None]:
# and now we can visualize the results
with torch.no_grad():
    new_data = harmonic_oscillator_problem.temporal_domain.sample(1000, "grid")
    true_solution = harmonic_oscillator_problem.solution(new_data)
    pinn_solution = pinn(new_data)
    plt.figure(figsize=(10, 4))
    plt.subplot(1, 3, 1)
    plt.plot(new_data, true_solution, label="True")
    plt.legend()
    plt.subplot(1, 3, 2)
    plt.plot(new_data, pinn_solution, "r", label="Predicted")
    plt.legend()
    plt.subplot(1, 3, 3)
    plt.plot(
        new_data, (true_solution - pinn_solution).abs(), "r", label="Absolute Error"
    )
    plt.legend()
    plt.tight_layout()

### Benchmarking PINNs variants

One of PINA‚Äôs key strengths is its modular design, which makes benchmarking both straightforward and flexible. To illustrate this, we will use the one-dimensional Advection equation. The one-dimensional Advection problem we aim to solve is mathematically defined as:

\begin{equation}
\begin{cases}
\frac{\partial}{\partial t} u(x, t) + c \frac{\partial}{\partial x} u(x, t) = 0 \quad (x, t) \in [0,2\pi]\times [0,1], \\
u(x, t) = \sin(x) \quad (x, t) \in [0,2\pi]\times {0},
\end{cases}
\end{equation}

While this example is simple and pedagogical, it's important to note that the solution for $c\gg 1$ posits a great challenge for PINNs. In PINA we have it already implemented in the `problem.zoo` module.


In [None]:
from pina.problem.zoo.advection import AdvectionProblem

advection_problem = AdvectionProblem(c=5)  # increase c to make it more challenging
advection_problem.discretise_domain(
    sample_rules={"x": {"n": 20, "mode": "grid"}, "t": {"n": 200, "mode": "grid"}},
    domains=["D"],
)
advection_problem.discretise_domain(
    sample_rules={"x": {"n": 20, "mode": "grid"}, "t": {"n": 1, "mode": "grid"}},
    domains=["t0"],
)

For benchmarking will use the classical `PINN` solver and its variants, namely `GradientPINN` [5] and `RBAPINN` [6]. We brifely report below how their loss function is defined:

**Classical PINN**
$$\theta_{\rm{best}}=\min_{\theta}\mathcal{L}_{\rm{problem}}(\theta), \quad  \mathcal{L}_{\rm{problem}}(\theta)= \frac{1}{N_{D}}\sum_{i=1}^N
\mathcal{L}(\mathcal{A}[\mathcal{M}_{\theta}(\mathbf{x}_i)]) +
\frac{1}{N}\sum_{i=1}^N
\mathcal{L}(\mathcal{B}[\mathcal{M}_{\theta}(\mathbf{x}_i)])$$

**Gradient PINN**
$$\theta_{\rm{best}}=\min_{\theta}\mathcal{L}_{\rm{problem}}(\theta), \quad  \mathcal{L}_{\rm{problem}}(\theta)= \frac{1}{N_{D}}\sum_{i=1}^N
\mathcal{L}(\mathcal{A}[\mathcal{M}_{\theta}(\mathbf{x}_i)]) +
\frac{1}{N}\sum_{i=1}^N
\mathcal{L}(\mathcal{B}[\mathcal{M}_{\theta}(\mathbf{x}_i)]) + \frac{1}{N\cdot D}\sum_{i=1}^N\sum_{d=1}^D \nabla_{x^{(d)}}\mathcal{L}(\mathcal{A}[\mathcal{M}_{\theta}(\mathbf{x}_i)])$$

**RBAPINN**
$$\theta_{\rm{best}}=\min_{\theta}\mathcal{L}_{\rm{problem}}(\theta), \quad  \mathcal{L}_{\rm{problem}}(\theta)= \frac{1}{N_{D}}\sum_{i=1}^N
\lambda_{\Omega}^i\mathcal{L}(\mathcal{A}[\mathcal{M}_{\theta}(\mathbf{x}_i)]) +
\frac{1}{N}\sum_{i=1}^N
\lambda_{\partial \Omega}^i\mathcal{L}(\mathcal{B}[\mathcal{M}_{\theta}(\mathbf{x}_i)]), \quad \lambda^i_{k+1} \leftarrow \gamma\lambda^i_{k} + 
        \eta\frac{\lvert r_i\rvert}{\max_j \lvert r_j\rvert}\;\forall k,i$$

In [None]:
trainers = {}
model = FeedForward(2, 1, inner_size=32, n_layers=2)
for Solver in [RBAPINN]:
    solver = Solver(
        problem=advection_problem,
        model=model,
        optimizer=TorchOptimizer(torch.optim.Adam, lr=1e-4),
    )
    checkpoint_callback = ModelCheckpoint(
        dirpath="benchmark_ckpt/",
        filename=f"{Solver.__name__}",
    )
    logger = TensorBoardLogger(
        save_dir="benchmark_logs/",
        name=f"{Solver.__name__}",
    )
    trainer = Trainer(
        solver,
        max_epochs=100000,
        accelerator="cpu",
        enable_model_summary=False,
        callbacks=[checkpoint_callback],
        logger=logger,
    )
    # trainer.train() # <=== Uncomment if you want to train ~30 mins for model
    trainers[Solver.__name__] = trainer
    model.apply(
        lambda layer: (
            layer.reset_parameters() if hasattr(layer, "reset_parameters") else None
        )
    )

In [None]:
def plot_solution(pts, predicted, target):
    plt.figure(figsize=(10, 5))
    plt.subplot(1, 2, 1)
    plt.tricontourf(
        pts.extract(["x"]).flatten(), pts.extract(["t"]).flatten(), predicted
    )
    plt.colorbar()
    plt.title("Neural Network solution")
    plt.subplot(1, 2, 2)
    plt.tricontourf(pts.extract(["x"]).flatten(), pts.extract(["t"]).flatten(), target)
    plt.colorbar()
    plt.title("True solution")
    plt.figure()


with torch.no_grad():
    loss_fn = LpLoss(p=2, relative=True)
    for idx, Solver in enumerate([PINN, GradientPINN, RBAPINN]):
        # get the solver LOAD IT FROM CHECKPOINT <======
        solver_name = Solver.__name__
        solver = Solver.load_from_checkpoint(
            checkpoint_path=f"benchmark_ckpt/{solver_name}.ckpt",
            problem=advection_problem,
            model=model,
        )
        # compute solution
        pts = solver.problem.input_pts["D"]
        predicted = solver(pts).extract("u").flatten()
        target = solver.problem.solution(pts).flatten()
        # sample new test points
        print(
            f"L2 relative error {solver_name}:   {loss_fn(target, predicted).item():.5}"
        )
# Plot solution only for the last
plot_solution(pts, predicted, target)

You can also look at how the loss (and other metrics) decrease, by analyzing the `benchmark_logs`. Just run

```bash
tensorboard --logdir=benchmark_logs
```

To obtain:
<p align="center">
    <img src="https://raw.githubusercontent.com/dario-coscia/KTH-Summer-School-PINNs-PINA/main/imgs/logs.png" width="1000"/>
</p>

### Solving Inverse Problems with PINNs

The inverse problem is defined as a Poisson equation with homogeneous boundary conditions with unknown parameters:

\begin{equation}
\begin{cases}
\Delta u = e^{-2(x - \mu_1)^2 - 2(y - \mu_2)^2} \quad \text{in } \Omega, \\
u = 0 \quad \text{on } \partial \Omega, \\
u(\mu_1, \mu_2) = \text{data}
\end{cases}
\end{equation}

Here, $\Omega$ is the square domain $[-2, 2] \times [-2, 2]$, and $\partial \Omega = \Gamma_1 \cup \Gamma_2 \cup \Gamma_3 \cup \Gamma_4$ represents the union of its boundaries.

This type of setup defines an *inverse problem*, which has two primary objectives:

- **Find the solution** $u$ that satisfies the Poisson equation,
- **Identify the unknown parameters** $(\mu_1, \mu_2)$ that best fit the given data (as described by the third equation in the system).

To tackle both objectives, we will define an `InverseProblem` using **PINA**!

We import the pre-saved data corresponding to the true parameter values $(\mu_1, \mu_2) = (0.5, 0.5)$.  
These values represent the *optimal parameters* that we aim to recover through neural network training.

In particular, we load:

- `input` points ‚Äî the spatial coordinates where observations are available,
- `target` points ‚Äî the corresponding $u$ values (i.e., the solution evaluated at the `input` points).

This data will be used to guide the inverse problem and supervise the network‚Äôs prediction of the unknown parameters.


In [None]:
data_output = torch.load("data/pinn_solution_0.5_0.5", weights_only=False).detach()
data_input = torch.load("data/pts_0.5_0.5", weights_only=False)

Next, we initialize the Poisson problem, which inherits from the `SpatialProblem` and `InverseProblem` classes.  
In this step, we need to define all the variables and specify the domain in which our unknown parameters $(\mu_1, \mu_2)$ reside.

Note that the Laplace equation also takes these unknown parameters as inputs. These parameters will be treated as variables that the neural network will optimize during the training process, enabling it to learn the optimal values for $(\mu_1, \mu_2)$.

In [None]:
def laplace_equation(input_, output_, params_):
    force_term = torch.exp(
        -2 * (input_.extract(["x"]) - params_["mu1"]) ** 2
        - 2 * (input_.extract(["y"]) - params_["mu2"]) ** 2
    )
    delta_u = laplacian(output_, input_, components=["u"], d=["x", "y"])
    return delta_u - force_term


class Poisson(SpatialProblem, InverseProblem):

    output_variables = ["u"]
    x_min, x_max = -2, 2
    y_min, y_max = -2, 2
    spatial_domain = CartesianDomain({"x": [x_min, x_max], "y": [y_min, y_max]})
    unknown_parameter_domain = CartesianDomain({"mu1": [-1, 1], "mu2": [-1, 1]})

    domains = {
        "g1": CartesianDomain({"x": [x_min, x_max], "y": y_max}),
        "g2": CartesianDomain({"x": [x_min, x_max], "y": y_min}),
        "g3": CartesianDomain({"x": x_max, "y": [y_min, y_max]}),
        "g4": CartesianDomain({"x": x_min, "y": [y_min, y_max]}),
        "D": CartesianDomain({"x": [x_min, x_max], "y": [y_min, y_max]}),
    }

    conditions = {
        "g1": Condition(domain="g1", equation=FixedValue(0.0)),
        "g2": Condition(domain="g2", equation=FixedValue(0.0)),
        "g3": Condition(domain="g3", equation=FixedValue(0.0)),
        "g4": Condition(domain="g4", equation=FixedValue(0.0)),
        "D": Condition(domain="D", equation=Equation(laplace_equation)),
        "data": Condition(input=data_input, target=data_output),
    }


problem = Poisson()

Next, we define the neural network model that will be used for solving the inverse problem. In this case, we use a simple FeedForeard model, but you could build one that imposes *hard constraints* on the boundary conditions, similar to the approach used in the [Wave tutorial](https://mathlab.github.io/PINA/tutorial3/tutorial.html) to have better performances!

In [None]:
model = FeedForward(
    layers=[20, 20, 20],
    func=torch.nn.Softplus,
    output_dimensions=len(problem.output_variables),
    input_dimensions=len(problem.input_variables),
)

After that, we discretize the spatial domain.

In [None]:
problem.discretise_domain(20, "grid", domains=["D"])
problem.discretise_domain(
    1000,
    "random",
    domains=["g1", "g2", "g3", "g4"],
)

Here, we define a simple callback for the trainer. This callback is used to save the parameters predicted by the neural network during training.  
The parameters are saved every 100 epochs as `torch` tensors in a specified directory (in our case, `tutorial_logs`).

The goal of this setup is to read the saved parameters after training and visualize their trend across the epochs. This allows us to monitor how the predicted parameters evolve throughout the training process.


In [None]:
# temporary directory for saving logs of training
tmp_dir = "tutorial_logs"


class SaveParameters(Callback):
    """
    Callback to save the parameters of the model every 100 epochs.
    """

    def on_train_epoch_end(self, trainer, __):
        if trainer.current_epoch % 100 == 99:
            torch.save(
                trainer.solver.problem.unknown_parameters,
                "{}/parameters_epoch{}".format(tmp_dir, trainer.current_epoch),
            )

Then, we define the `PINN` object and train the solver using the `Trainer`

In [None]:
max_epochs = 1500
pinn = PINN(problem, model, optimizer=TorchOptimizer(torch.optim.Adam, lr=0.005))
# define the trainer for the solver
trainer = Trainer(
    solver=pinn,
    accelerator="cpu",
    max_epochs=max_epochs,
    default_root_dir=tmp_dir,
    enable_model_summary=False,
    callbacks=[SaveParameters()],
    train_size=1.0,
    val_size=0.0,
    test_size=0.0,
)
trainer.train()

One can now see how the parameters vary during the training by reading the saved solution and plotting them. The plot shows that the parameters stabilize to their true value before reaching the epoch $1500$!

In [None]:
epochs_saved = range(99, max_epochs, 100)
parameters = torch.empty((int(max_epochs / 100), 2))
for i, epoch in enumerate(epochs_saved):
    params_torch = torch.load(
        "{}/parameters_epoch{}".format(tmp_dir, epoch), weights_only=False
    )
    for e, var in enumerate(pinn.problem.unknown_variables):
        parameters[i, e] = params_torch[var].data

# Plot parameters
plt.close()
plt.plot(epochs_saved, parameters[:, 0], label="mu1", marker="o")
plt.plot(epochs_saved, parameters[:, 1], label="mu2", marker="s")
plt.ylim(-1, 1)
plt.grid()
plt.legend()
plt.xlabel("Epoch")
plt.ylabel("Parameter value")
plt.show()

## üëâ Generative Modelling

**Generative models** learn a (conditional) probability distribution using neural network models.  
Given a target distribution $p_{\mathrm{target}}(x)$, the goal of a *fast generative model* is to learn a transformation that allows us to efficiently draw new samples that resemble the target data. Unlike diffusion models, which rely on slow stochastic sampling procedures, fast generative models aim to produce high-quality samples in only a few neural network evaluations‚Äîideally in a single deterministic forward pass.

A key example of such an approach is **flow matching** [8]. Flow matching methods learn a *continuous normalizing flow* defined by an ordinary differential equation (ODE)

$$
\frac{d x_t}{d t} = v_\theta(x_t, t),
$$

where $v_\theta$ is a neural network that parameterizes a time-dependent velocity field transporting samples from a simple base distribution $p_0$ (e.g., a Gaussian) toward the target distribution $p_{\mathrm{target}}$.  
To train this velocity field, flow matching introduces an analytically defined *target flow* $u_t(x)$ that moves samples along known interpolation paths‚Äîmost commonly the linear flow

$$
x_t = (1 - t)x_0 + t x_1,
\qquad 
u_t(x_t \mid x_0, x_1) = x_1 - x_0,
$$

where $x_0 \sim p_0$ and $x_1 \sim p_{\mathrm{target}}$.  
The learning objective simply matches the model velocity to this target velocity:

$$
\mathcal{L}(\theta)
= \mathbb{E}_{t,\, x_0,\, x_1}
\left[ \left\| v_\theta(x_t, t) - u_t(x_t \mid x_0, x_1) \right\|^2 \right].
$$

Once trained, generation is extremely fast: we draw $x_0 \sim p_0$ and solve the ODE forward from $t=0$ to $t=1$, giving a sample distributed as $p_{\mathrm{target}}$. Because the learned flow is deterministic and smooth, this leads to rapid, stable sampling‚Äîillustrating why flow matching is a powerful tool for building fast generative models.

In [None]:
class GenerativeProblem(AbstractProblem):

    conditions = {}
    input_variables = output_variables = None

    def __init__(self, n_points=5000, input_variables=None, output_variables=None):
        # Set input and output variables
        self.input_variables = input_variables
        self.output_variables = output_variables
        input_data = self.create_dataset(n_points)
        # Set the condition
        self.conditions["data"] = DataCondition(input=input_data)
        super().__init__()

    @staticmethod
    def create_dataset(batch_size):
        x1 = torch.rand(batch_size) * 4 - 2
        x2_ = torch.rand(batch_size) - torch.randint(high=2, size=(batch_size,)) * 2
        x2 = x2_ + (torch.floor(x1) % 2)
        data = 1.0 * torch.cat([x1[:, None], x2[:, None]], dim=1) / 0.45
        return data.float()

# let's see some samples
data = GenerativeProblem.create_dataset(5000)
fig, axs = plt.subplots(1, 1, figsize=(3, 3))
H = axs.hist2d(
    data[:, 0], data[:, 1], 300, range=((-5, 5), (-5, 5))
)
cmin = 0.0
cmax = torch.quantile(torch.from_numpy(H[0]), 0.99).item()
norm = cm.colors.Normalize(vmax=cmax, vmin=cmin)
_ = axs.hist2d(
    data[:, 0],
    data[:, 1],
    300,
    range=((-5, 5), (-5, 5)),
    norm=norm,
)
axs.set_aspect("equal")
axs.axis("off")
plt.tight_layout()
plt.show()

Let's now create the solver and the Model

In [None]:
class FlowMatchingSolver(SingleSolverInterface):
    # this is needed when defying the optimization loop
    accepted_conditions_types = DataCondition

    # FM optimization
    def optimization_cycle(self, batch):
        condition_loss = {}
        for condition_name, points in batch:
            # set final time to input point
            x_1 = points["input"]
            # sample time
            t = torch.rand((x_1.shape[0], 1))
            # sample initial state
            x_0 = torch.randn_like(x_1)
            # compute x_t
            x_t = t * x_1 + (1 - t) * x_0
            # compute true field
            u_t = x_1 - x_0
            # compute ML field
            input_ = {"data": x_t, "time": t}
            v_t = self.forward(input_)
            condition_loss[condition_name] = torch.nn.functional.mse_loss(v_t, u_t)
        return condition_loss

    # sampling
    @torch.no_grad()
    def sample(self, data_size, integration_steps):
        # sample initial gaussian
        x_0 = torch.randn(size=data_size)
        # do euler integration (or any other)
        traj = [x_0]
        times = [0]
        for t in range(1, integration_steps):
            input_ = {
                "data": traj[-1],
                "time": torch.tensor(times[-1]),
            }
            x_t = traj[-1] + (1 / integration_steps) * self.forward(input_)
            traj.append(x_t)
            times.append((t + 1) / integration_steps)

        return traj, times


class VectorFieldModel(torch.nn.Module):
    def __init__(self):
        super().__init__()
        self.input_dim = 2
        self.time_dim = 1
        self.mlp = FeedForward(
            input_dimensions=3,
            output_dimensions=2,
            n_layers=3,
            inner_size=128,
            func=torch.nn.Tanh
        )

    def forward(self, input):
        # extract input
        x = input["data"]
        t = input["time"]
        # reshaping
        sz = x.size()
        t = t.reshape(-1, 1).expand(x.shape[0], 1)
        h = torch.cat([x, t], dim=1)
        return self.mlp(h).reshape(*sz)

Then we train!

In [None]:
trainer = Trainer(
    solver=FlowMatchingSolver(
        problem=GenerativeProblem(5000), model=VectorFieldModel(), use_lt=False
    ),
    max_epochs=2000,
    batch_size=None,
    accelerator="cpu",
)

# now train
trainer.train()

Let's sample from the generative model

In [None]:
integration_steps = 10
traj, times = trainer.solver.sample(
    data_size=(100000, 2),
    integration_steps=integration_steps,
)
from matplotlib import cm

fig, axs = plt.subplots(1, 10, figsize=(16, 8))
idx = 0
for i in range(0, len(times), len(times)//9):
    H = axs[idx].hist2d(traj[i][:, 0], traj[i][:, 1], 300, range=((-5, 5), (-5, 5)))
    cmin = 0.0
    cmax = torch.quantile(torch.from_numpy(H[0]), 0.99).item()
    norm = cm.colors.Normalize(vmax=cmax, vmin=cmin)
    _ = axs[idx].hist2d(
        traj[i][:, 0],
        traj[i][:, 1],
        300,
        range=((-5, 5), (-5, 5)),
        norm=norm,
    )
    axs[idx].set_aspect("equal")
    axs[idx].axis("off")
    axs[idx].set_title("t= %.2f" % times[i])
    idx += 1
plt.tight_layout()
plt.show()

## üëâ References

1. Coscia, D., Ivagnes, A., Demo, N., & Rozza, G. (2023), *Physics-Informed Neural networks for Advanced modeling.*, Journal of Open Source Software, 8(87), 5352.
2. Rozza G., Stabile G., Ballarin F. (2022). Advanced Reduced Order Methods and Applications in Computational Fluid Dynamics, Society for Industrial and Applied Mathematics. 
3. Hesthaven, J. S., & Ubbiali, S. (2018). Non-intrusive reduced order modeling of nonlinear problems using neural networks. Journal of Computational Physics, 363, 55-78.
4. Raissi, M., Perdikaris, P., & Karniadakis, G. E. (2019). Physics-informed neural networks: A deep learning framework for solving forward and inverse problems involving nonlinear partial differential equations. Journal of Computational physics, 378, 686-707.
5. Yu, J., Lu, L., Meng, X., & Karniadakis, G. E. (2022). Gradient-enhanced physics-informed neural networks for forward and inverse PDE problems. Computer Methods in Applied Mechanics and Engineering, 393, 114823.
6. Anagnostopoulos, S. J., Toscano, J. D., Stergiopulos, N., & Karniadakis, G. E. (2023). Residual-based attention and connection to information bottleneck theory in PINNs. arXiv preprint arXiv:2307.00379.
7. Unke, O. T., Chmiela, S., Sauceda, H. E., Gastegger, M., Poltavsky, I., Schutt, K. T., ... & MuÃàller, K. R. (2021). Machine learning force fields. Chemical Reviews, 121(16), 10142-10186.
8. Lipman, Yaron, et al. "Flow matching for generative modeling." arXiv preprint arXiv:2210.02747 (2022).