# Artificial Intelligence and Modern Physics: a Two Way Connection

The following is part of the hands-on sessions of the [AIPhy](https://aiphy.fisica.unimib.it/) school.
The notebook aims at giving an **introduction to machine learning** methods in Python.
Tutorials deal with different **unsupervised and supervised algorithms**.
Students will learn how to build these algorithms from scratch using basic Python classes.
They will then apply different techniques to test and evaluate them on toy and real world datasets.

## Preliminary Actions

I recommend you use a Python **virtual environment** to setup your work area:

```bash
python -m venv .venv
```

At the time of writing the Python version is `3.10.12`: you can change this either with your distribution package manager, or by installing a **Conda** environment (e.g. `conda create -y -n aiphy python"==3.10.12" && conda activate aiphy`).

You can then activate it with:

```bash
source .venv/bin/activate
```

Before we begin, remember to install the requirements:

```bash
pip install -r requirements.txt
```

We shall use mainly the `numpy` library for the algorithms, and the `matplotlib` library for plots.

## Neural Networks

Neural Networks (NNs) are a family of algorithms, usually trained using (mini-batch) gradient descent, capable of producing accurate predictions.
Their training procedure is based on the [backpropagation](https://en.wikipedia.org/wiki/Backpropagation) algorithm, which implements the _chain rule_ of derivation to compute the gradient of a **loss function** with respect to the different parameters of the function.

In simpler terms, a NN is a function

$$
f\; \colon\; \mathbb{R}^{w_{(0)}} \to \mathbb{R}^{w_{(N-1)}},
$$

which is the result of the composition of multiple functions $g^{(n)}\; \colon\; \mathbb{R}^{w_{(n-1)}} \to \mathbb{R}^{w_{(n)}}$:

$$
f = g^{(N-1)} \circ g^{(N-2)} \circ \dots \circ g^{(0)}.
$$

These functions $g^{(n)}$ are called **layers** of the NN, and are such that

- the function $f$ is continuous and differentiable,
- the function $f$ can model **non linear** functions,
- we can apply the _chain rule_ to compute the gradient descent of the function $f$.

### Backpropagation

The "bricks" $g^{(n)}$ are non linear functions, often modelled as the composition of an _affine_ application, represented by a _weight_ $W$ and a _bias_ $b$

$$
z^{(n)} = W^{(n)}\, x^{(n-1)} + b^{(n)},
$$

and a non linear _activation function_ $a^{(n)}$ (their nature can differ according to the type of network we are dealing with):

$$
g^{(n)}(z^{(n)}) = a^{(n)}(z^{(n)}).
$$

The network is finally _trained_ using a **loss function** (or _Lagrangian_), used to minimise the "distance" to a target:

$$
\mathcal{L}\left( y, f(x) \right) = \mathbb{E}\left[ \mathfrak{F}(y, f(x)) \right],
$$

where $\mathfrak{F}$ is a function of the **targets** (labels) $y$ and the **predictions** $f(x)$ of the network (which directly depend on the parameters).

The **backpropagation** has the objective to compute the gradient of the loss function with respect to the parameters, in order to update them according to the steepest descent:

$$
\begin{cases}
W^{(n)} & \gets W^{(n)} - \alpha \frac{\partial \mathcal{L}}{\partial W^{(n)}}
\\
b^{(n)} & \gets b^{(n)} - \alpha \frac{\partial \mathcal{L}}{\partial b^{(n)}}
\end{cases}
\quad
\forall n = 1, 2, \dots, N.
$$

This can be implemented by:

1. _moving forward_ by computing the prediction $f(x)$ and the value of the loss function $\mathcal{L}(y, f(x))$,
2. _tracing back_ by computing the gradients of the loss function with respect to the inputs of each layer, in **reversed order**,
3. _updating_ the parameters $W^{(n)}$ and $b^{(n)}$ according to the steepest descent.

In other words, we need to compute:

$$
\frac{\partial \mathcal{L}}{\partial W^{(n)}}
=
\frac{\partial \mathcal{L}}{\partial z^{(n)}}\,
\cdot
\frac{\partial z^{(n)}}{\partial W^{(n)}}
=
\frac{\partial \mathcal{L}}{\partial g^{(N)}}\,
\cdot
\frac{\partial g^{(N)}}{\partial z^{(N)}}\,
\odot
\frac{\partial z^{(N)}}{\partial g^{(N-1)}}\,
\cdot
\frac{\partial g^{(N-1)}}{\partial z^{(N-2)}}\,
\odot
\dots\,
\cdot
\frac{\partial z^{(n)}}{\partial W^{(n)}},
$$

where the $\cdot$ operation identifies a matrix multiplication, while $\odot$ is the element-wise [Hadamard](https://en.wikipedia.org/wiki/Hadamard_product_(matrices)) product, which occurs when the derivative of the activation function is computed entry-wise.
Most of the code boils down to computing:

$$
\frac{\partial \mathcal{L}}{\partial z^{(n)}}
=
\delta^{(n)}
=
\frac{\partial \mathcal{L}}{\partial g^{(N)}}\,
\cdot
\frac{\partial g^{(N)}}{\partial z^{(N)}}\,
\odot
\frac{\partial z^{(N)}}{\partial g^{(N-1)}}\,
\cdot
\frac{\partial g^{(N-1)}}{\partial z^{(N-2)}}\,
\odot
\dots\,
\frac{\partial g^{(n)}}{\partial z^{(n)}}
=
\delta^{(n+1)}
\frac{\partial z^{(n+1)}}{\partial z^{(n)}}
=
\delta^{(n+1)}\,
\odot
\frac{\partial z^{(n+1)}}{\partial g^{(n)}}\,
\cdot
\frac{\partial g^{(n)}}{\partial z^{(n)}}.
$$

This is a **recursive** relation we can use to implement the gradient descent algorithm on a network of functions.
With some smart caching in our code, this is doable efficiently.

### Mini-batch Gradient Descent

Usually, NNs are trained on huge amounts of data (not our case, fortunately!).
This presents the disadvantage of not being able to load all data in memory at once.
One possible solution is to adopt a **mini-batch** approach, that is to split the data into smaller subsets (few samples per batch) and compute the gradient descent on each batch: even though the descent is "biased", this helps to speed things up, and, sometimes, to make things possible.
We can even get to the extreme case of **stochastic** gradient descent, were the update of the parameters is compute on each sample individually (that is, when the size of the mini-batch is 1).

### Coding Neural Networks

In what follows, we implement a **fully connected** NN, and train it with mini-batch gradient descent using Python classes.
We first import the necessary libraries, and proceed by first building abstract classes, with a shared logic, and then implementing the methods on a case-by-case basis.

In [1]:
%load_ext autoreload
%autoreload 2
%matplotlib inline

In [2]:
import graphviz  # this is only for visualisation
import numpy as np
import pandas as pd

from joblib import dump, load
from matplotlib import pyplot as plt
from numpy.typing import NDArray
from sklearn.cluster import KMeans
from sklearn.decomposition import PCA
from sklearn.metrics import accuracy_score, precision_recall_fscore_support
from sklearn.preprocessing import StandardScaler
from tqdm import tqdm
from typing import Dict, Iterator, Tuple, Optional
from IPython.display import display, Image

We then select a style for the plots: you can freely play with this parameter, but I like this one.

In [3]:
plt.style.use('grayscale')

We fix a random number generator for reproducibility and testing.

In [4]:
gen = np.random.default_rng(42)

### The Abstract Implementation

We first consider the structure of the network: we shall need different types of components, such as **layers**, **activations**, and **loss functions**.

#### Layers

We tackle the problem of the layers.
These are structures which contain **trainable parameters** and need to be updated during the gradient descent.
Thus, we shall need:

1. a **parameter** dictionary, where we store weights and biases,
2. some methods useful for visualisation of the structure (such as a method to pretty print the parameters, or to return the number of parameters in the layer);
3. a **forward** pass function to compute the output of the layer,
4. a **backward** pass function to compute the gradient of the loss function with respect to the _inputs_ of the layer (i.e. the $\delta^{(n)}$ in the formula above),
5. an **update** function to update the parameters of the layer by computing the gradients of the loss function with respect to the _parameters_ of the layer (using the $\delta^{(n)}$ in the formula above).

In [5]:
class Layer:
    """An abstract layer class."""

    def __init__(self) -> None:
        self.params: Dict[str, NDArray] = {}

    def __repr__(self) -> str:
        return self.__class__.__name__  # it's for visualisation...

    def __str__(self) -> str:
        return self.__repr__()  # same...

    def __call__(self, X: NDArray) -> NDArray:
        """
        Forward pass of the layer.

        Parameters
        ----------
        X : NDArray
            Input data.

        Returns
        -------
        NDArray
            Output of the layer.
        """
        return self.forward(X)

    @property
    def type(self) -> str:
        return 'layer'

    @property
    def named_parameters(self) -> Dict[str, NDArray]:
        return self.params

    @property
    def n_params(self) -> int:
        return int(sum([np.prod(p.shape) for p in self.params.values()]))

    def forward(self, X: NDArray) -> NDArray:
        """
        Forward pass of the layer.

        Parameters
        ----------
        X : NDArray
            Input data.

        Returns
        -------
        NDArray
            Output of the layer.
        """
        raise NotImplementedError(
            'All subclasses of Layer must implement the forward method!')

    def backward(self) -> NDArray:
        """
        Backward pass of the layer.

        Returns
        -------
        NDArray
            Gradient with respect to the input of the layer.
        """
        raise NotImplementedError(
            'All subclasses of Layer must implement the backward method!')

    def update(self, delta: NDArray, lr: float) -> None:
        """
        Update parameters of the layer.

        Parameters
        ----------
        delta : NDArray
            Backpropagation gradient.
        lr : float
            Learning rate.
        """
        raise NotImplementedError(
            'All subclasses of Layer must implement the update method!')

We can then implement a simple linear layer $z^{(n)}$:

- what kind of information on $z^{(n)}$ is needed to fully characterise the function?
- how do we initialise the parameters?
- what is the gradient of $z^{(n)}$ with respect to the input $x^{(n)}$?
- often the backpropagation is shown for a single sample, but how do we update the parameters of the layer using an entire mini-batch?

In [6]:
class Linear(Layer):
    """A linear (fully connected) layer."""

    def __init__(self, in_features: int, out_features: int) -> None:
        """
        Parameters
        ----------
        in_features : int
            Number of input features.
        out_features : int
            Number of output features.
        """
        super().__init__()
        # Save the hyperparameters
        # YOUR CODE HERE

        # Initialize the parameters
        self.params = {}
        # YOUR CODE HERE

    def __repr__(self) -> str:
        return f'Linear(in_features={self.in_features}, out_features={self.out_features})'

    @property
    def type(self) -> str:
        return 'linear'

    def forward(self, X: NDArray) -> NDArray:
        """
        Forward pass of the linear layer.

        Parameters
        ----------
        X : NDArray
            Input data.

        Returns
        -------
        NDArray
            Output of the linear layer.

        Returns
        -------
        NDArray
            Output of the linear layer.
        """
        self._input = # YOUR CODE HERE  # maybe cache the input for the update
        return # YOUR CODE HERE

    def backward(self) -> NDArray:
        """
        Backward pass of the linear layer.

        Returns
        -------
        NDArray
            Gradient with respect to the input of the linear layer.
        """
        return # YOUR CODE HERE

    def update(self, delta: NDArray, lr: float) -> None:
        """
        Update parameters of the linear layer.

        Parameters
        ----------
        delta : NDArray
            Backpropagation gradient.
        lr : float
            Learning rate.
        """
        # Parameters is a dictionary, so we use a dictionary of gradients
        self._grads = {
            # YOUR CODE HERE
        }
        self.params['W'] -= lr * self._grads['W']
        self.params['b'] -= lr * self._grads['b']

In [None]:
# THIS IS A TEST CELL. DO NOT DELETE IT.

# Create a linear layer
layer = Linear(in_features=32, out_features=16)
if layer.n_params != 16 * (32 + 1):
    raise ValueError('The  nunumber of parameters do not match!')

# Test the forward pass
y_pred = gen.normal(size=(8, 32))
y_true = layer(y_pred)
if y_true.shape != (8, 16):
    display(Image('img/allegri_giacca.gif', width=500))
    raise ValueError('The shape of the output is not correct!')

# Test the backward pass
grad_mse = layer.backward()
if grad_mse.shape != (32, 16):
    display(Image('img/allegri_giacca.gif', width=500))
    raise ValueError('The shape of the gradient is not correct!')
if (grad_mse != layer.named_parameters['W']).all():
    display(Image('img/allegri_giacca.gif', width=500))
    raise ValueError('The gradient is not correct!')

# Test the update pass
delta = gen.normal(size=(8, 16))
layer.update(delta, lr=0.001)
if layer._grads['W'].shape != layer.named_parameters['W'].shape:
    display(Image('img/allegri_giacca.gif', width=500))
    raise ValueError('The shape of the gradient of the weight is not correct!')
if layer._grads['b'].shape != layer.named_parameters['b'].shape:
    display(Image('img/allegri_giacca.gif', width=500))
    raise ValueError('The shape of the gradient of the bias is not correct!')

# All tests passed
print('All tests passed successfully!')
display(Image('img/allegri_calma.gif', width=500))

#### Activations

We can move to the activations.
These functions implement the non linearity needed to model the output of the NN.
They do not contain parameters.
However, they play a role in backpropagation.

In [8]:
class Activation:
    """An abstract activation layer."""

    def __repr__(self) -> str:
        return self.__class__.__name__

    def __str__(self) -> str:
        return self.__repr__()

    def __call__(self, X: NDArray) -> NDArray:
        """
        Forward pass of the layer.

        Parameters
        ----------
        X : NDArray
            Input data.

        Returns
        -------
        NDArray
            Output of the layer.
        """
        return self.forward(X)

    @property
    def n_params(self) -> int:
        return 0

    @property
    def type(self) -> str:
        return 'activation'

    def forward(self, X: NDArray) -> NDArray:
        """
        Forward pass of the layer.

        Parameters
        ----------
        X : NDArray
            Input data.

        Returns
        -------
        NDArray
            Output of the layer.
        """
        raise NotImplementedError(
            'All subclasses of Activation must implement the forward method!')

    def backward(self) -> NDArray:
        """
        Backward pass of the layer.

        Returns
        -------
        NDArray
            Gradient with respect to the input of the layer.
        """
        raise NotImplementedError(
            'All subclasses of Activation must implement the backward method!')


Let us implement a couple of them, starting from trivial up to the most famous of all.
We shall consider:

- an **identity** activation, which we shall use only to help us think about the structure of the NN;
- the **ReLU** activation, omni-present and universal.

In [9]:
class Identity(Activation):
    """An identity activation layer"""

    def __repr__(self) -> str:
        return 'Identity()'

    @property
    def type(self) -> str:
        return 'identity'

    def forward(self, X: NDArray) -> NDArray:
        """
        Forward pass of the identity activation layer.

        Parameters
        ----------
        X : NDArray
            Input data.

        Returns
        -------
        NDArray
            Output of the identity activation layer.
        """
        self._input = # YOUR CODE HERE
        return # YOUR CODE HERE

    def backward(self) -> NDArray:
        """
        Backward pass of the identity activation layer.

        Returns
        -------
        NDArray
            Gradient with respect to the input of the identity activation layer.
        """
        return np.ones_like(self._input)

In [10]:
class ReLU(Activation):
    """A ReLU activation layer."""

    def __init__(self) -> None:
        super().__init__()

    def __repr__(self) -> str:
        return 'ReLU()'

    @property
    def type(self) -> str:
        return 'relu'

    def forward(self, X: NDArray) -> NDArray:
        """
        Forward pass of the ReLU activation layer.

        Parameters
        ----------
        X : NDArray
            Input data.

        Returns
        -------
        NDArray
            Output of the ReLU activation layer.
        """
        self._input = # YOUR CODE HERE
        return # YOUR CODE HERE

    def backward(self) -> NDArray:
        """
        Backward pass of the ReLU activation layer.

        Returns
        -------
        NDArray
            Gradient with respect to the input of the ReLU activation layer.
        """
        return (
            #YOUR CODE HERE
        ).astype(self._input.dtype)

In [None]:
# THIS IS A TEST CELL. DO NOT DELETE IT.

# Create the activations layers
identity = Identity()
relu = ReLU()

# Test the forward pass
y_pred = gen.normal(size=(8, 32))
y_identity = identity(y_pred)
y_relu = relu(y_pred)
if y_identity.shape != y_pred.shape:
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError(
        'The shape of the output of the identity layer is not correct!')
if (y_identity != y_pred).all():
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError('The output of the identity layer is not correct!')
if y_relu.shape != y_pred.shape:
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError(
        'The shape of the output of the ReLU layer is not correct!')
if (y_relu < 0).any():
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError('The output of the ReLU layer is not correct!')

# Test the backward pass
grad_identity = identity.backward()
grad_relu = relu.backward()
if grad_identity.shape != y_pred.shape:
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError(
        'The shape of the gradient of the identity layer is not correct!')
if (grad_identity != np.ones_like(y_pred)).all():
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError('The gradient of the identity layer is not correct!')
if grad_relu.shape != y_pred.shape:
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError(
        'The shape of the gradient of the ReLU layer is not correct!')
if (grad_relu != (y_pred > 0)).all():
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError('The gradient of the ReLU layer is not correct!')

# All tests passed successfully
print('All tests passed successfully!')
display(Image('img/allegri_calma.gif', width=500))

#### Loss Functions

We now implement the loss function used in the NN (often called _criterion_ in many software implementations).
In this notebook, we shall consider a regression problem, first, thus we focus on a **mean squared error** loss.
However, we shall also try to make it a **cross entropy** loss function for a multi-class classification problem.

In [12]:
class Criterion:
    """An abstract criterion (loss function) class."""

    def __init__(self) -> None:
        super().__init__()

    def __repr__(self) -> str:
        return self.__class__.__name__

    def __str__(self) -> str:
        return self.__repr__()

    def __call__(self, y_pred: NDArray, y_true: NDArray) -> NDArray:
        """
        Forward pass of the loss function.

        Parameters
        ----------
        y_pred : NDArray
            Predictions.
        y_true : NDArray
            Targets.

        Returns
        -------
        NDArray
            Output of the layer.
        """
        return self.forward(y_pred, y_true)

    @property
    def type(self) -> str:
        return 'loss'

    def forward(self, y_pred: NDArray, y_true: NDArray) -> NDArray:
        """
        Forward pass of the loss function.

        Parameters
        ----------
        y_pred : NDArray
            Predictions.
        y_true : NDArray
            Targets.

        Returns
        -------
        NDArray
            Output of the layer.
        """
        raise NotImplementedError(
            'All subclasses of Layer must implement the forward method!')

    def backward(self) -> NDArray:
        """
        Backward pass of the loss function.

        Returns
        -------
        NDArray
            Gradient with respect to the input of the layer.
        """
        raise NotImplementedError(
            'All subclasses of Layer must implement the backward method!')


In [13]:
class MSELoss(Criterion):
    """Mean squared error loss."""

    def __init__(self) -> None:
        super().__init__()

    def __repr__(self) -> str:
        return 'MSELoss()'

    @property
    def type(self) -> str:
        return 'mse'

    def forward(self, y_pred: NDArray, y_true: NDArray) -> NDArray:
        """
        Forward pass of the mean squared error loss.

        Parameters
        ----------
        y_pred : NDArray
            Predictions.
        y_true : NDArray
            Targets.

        Returns
        -------
        NDArray
            Output of the mean squared error loss.
        """
        self._input = # YOUR CODE HERE
        return # YOUR CODE HERE

    def backward(self) -> NDArray:
        """
        Backward pass of the mean squared error loss.

        Returns
        -------
        NDArray
            Gradient with respect to the input of the mean squared error loss.
        """
        return # YOUR CODE HERE

The case of the **Cross Entropy** loss might need an introduction.
This loss is usually applied in a multi-class (let $C$ be the number of classes) context, where the **labels** must be expressed in _one-hot_ encoding.
That is, given a batch of targets $y \in \mathbb{N}^B$, where $y_i$ represents the **class** of the $i$-th sample, we convert:

$$
y
\quad
\longrightarrow
\quad
y \in \mathbb{N}^{B \times C},
$$

where

$$
y_{ij}
=
\begin{cases}
1 \quad &\text{if sample $i$ belongs to class $j$}
\\
0 \quad &\text{otherwise}
\end{cases}.
$$

This enables the possible comparison between the predictions $\widehat{y} \in \mathbb{R}^{B \times C}$, which we interpret as _probabilities_ of belonging to a class, and the _one-hot_ encoded vector.
Our implementation of **cross entropy** should therefore include a step of encoding of the class targets.

As we introduced it in class, the **probability** vector computed in the forward pass is modelled by a **softmax** activation function in the last layer:

$$
\widehat{y} = \mathrm{softmax}(z) = \frac{e^z}{\sum\limits_{k = 0}^{C-1} e^{z_k}}.
$$

In this implementation, we integrate the **softmax** layer _directly into the definition of the cross entropy_.
In other words, our implementation takes a vector of _logits_ as inputs.
As we shall see, this greatly simplifies the computations (and makes the code way more numerically stable).

We start by computing the gradient of the **cross-entropy**:

$$
\mathrm{H}(y, \widehat{y})
=
- \sum\limits_{i = 0}^{B - 1} y_i \ln(\widehat{y}_i).
$$

This is simply:

$$
\frac{\partial H}{\partial \widehat{y}_j}
=
- \frac{y_j}{\widehat{y}_j}.
$$

However, we know that

$$
\widehat{y} = \mathrm{softmax}(z),
$$

and we need to compute

$$
\frac{\partial \widehat{y}_j}{\partial z_i}
$$

to use the _chain rule_

$$
\frac{\partial H}{\partial z_i}
=
\sum\limits_{j = 0}^{B - 1}
\frac{\partial H}{\partial \widehat{y}_j}
\frac{\partial \widehat{y}_j}{\partial z_i}.
$$

This derivative is quite easy to compute:

$$
\frac{\partial \widehat{y}_j}{\partial z_i}
=
\frac{e^{z_j} \delta_{ij} \sum\limits_{k = 0}^{C - 1} e^{z_k} - e^{z_i} e^{z_j}}{\left( \sum\limits_{k = 0}^{C - 1} e^{z_k} \right)^2}
=
\widehat{y_j} \left( \delta_{ij} - \widehat{y_i} \right).
$$

Plugging the expression into the previous, the gradient of the **cross-entropy** is:

$$
\frac{\partial H}{\partial z_i}
=
\widehat{y_i} - y_i.
$$

In [14]:
class CrossEntropyLoss(Criterion):
    """Cross entropy loss."""

    def __init__(self, n_classes: int) -> None:
        """
        Parameters
        ----------
        n_classes : int
            The number of classes (used for one-hot encoding)
        """
        super().__init__()
        self.n_classes = # YOUR CODE HERE

    def __repr__(self) -> str:
        return 'CrossEntropyLoss()'

    @property
    def type(self) -> str:
        return 'cross_entropy'

    def _one_hot_encoder(self, y: NDArray) -> NDArray:
        """
        One-hot encode the classes.

        Parameters
        ----------
        y : NDArray
            The vector of classes.

        Returns
        -------
        NDArray
            The one-hot encoded classes.
        """
        # YOUR CODE HERE

    def _softmax(self, y: NDArray) -> NDArray:
        """
        Compute the softmax activation.

        Parameters
        ----------
        y : NDArray
            A vector of logits.

        Returns
        -------
        NDArray
            A vector of probabilities.
        """
        # YOUR CODE HERE

    def forward(self, y_pred: NDArray, y_true: NDArray) -> NDArray:
        """
        Forward pass of the cross entropy loss.

        Parameters
        ----------
        y_pred : NDArray
            Predictions (vector of logits).
        y_true : NDArray
            Targets (vector of class labels).

        Returns
        -------
        NDArray
            Output of the cross entropy loss.
        """
        eps = np.finfo(float).eps  # avoid natural logarithm of zero
        self._input = []  # store the inputs

        # Encode the true labels
        classes = # YOUR CODE HERE
        self._input.append(classes)

        # Activate the predictions
        activations = # YOUR CODE HERE
        self._input.append(activations)

        return float(
            # YOUR CODE HERE
        )

    def backward(self) -> NDArray:
        """
        Backward pass of the cross entropy loss

        Returns
        -------
        NDArray
            Gradient with respect to the input of the cross entropy loss.
        """
        return # YOUR CODE HERE

In [None]:
# THIS IS A TEST CELL. DO NOT DELETE IT.

# Create the criterion
mse_loss = MSELoss()
ce_loss = CrossEntropyLoss(n_classes=3)

# Test the forward pass
y_pred = gen.normal(size=(8, 3))
y_true = gen.normal(size=(8, 3))
y_true_classes = np.array([0, 2, 1, 0, 1, 2, 2, 0])

mse_loss_value = mse_loss(y_pred, y_true)
ce_loss_value = ce_loss(y_pred, y_true_classes)
if not isinstance(mse_loss_value, float):
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError(
        'The output of the mean squared error loss should be a scalar!')
if mse_loss_value < 0:
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError(
        'The output of the mean squared error loss should be a positive scalar!'
    )
if not np.isclose(
        mse_loss_value, 0.5 *
    (np.linalg.norm(y_pred - y_true, ord=2, axis=1)**2).mean()):
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError(
        'The output of the mean squared error loss should be the squared error!'
    )
if not isinstance(ce_loss_value, float):
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError(
        'The output of the cross entropy loss should be a scalar!')

# Test the backward pass
mse_grad = mse_loss.backward()
ce_grad = ce_loss.backward()
if mse_grad.shape != y_pred.shape:
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError(
        'The shape of the gradient of the mean squared loss is not correct!')
if ce_grad.shape != y_pred.shape:
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError(
        'The shape of the gradient of the cross entropy loss is not correct!')

# All tests passed successfully
print('All tests passed successfully!')
display(Image('img/allegri_calma.gif', width=500))

#### The Neural Network

Let us put all the layers together in a **NN** architecture!
The idea is to _stack_ the layers sequentially (this is not the only possibility, but let us stick to this one for simplicity).

**HOMEWORK**

Imagine a "multi-task" NN, where multiple _heads_ (NNs) are connected to a common _backbone_ (NN):

- how would you track the "flow" of the gradient in the network?
- what kind of changes in the code we could make to compute the correct gradient?

In [16]:
class Network:
    """A neural network."""

    def __init__(self,
                 *layers: Layer,
                 criterion: Optional[Criterion] = None,
                 lr: float = 0.001) -> None:
        """
        Parameters
        ----------
        layers : Layer
            Layers of the network (comma separated).
        criterion : Criterion
            Loss function. Can be None if the network is only used for inference.
        lr : float
            Learning rate.
        """
        self.layers = layers
        self.criterion = criterion
        self.lr = lr

    def __repr__(self) -> str:
        layer_list = []
        for layer in self.layers:
            layer_list.append(repr(layer))
        if self.criterion is None:
            return ' -> '.join(layer_list)
        return ' -> '.join(layer_list) + ' -> ' + str(self.criterion)

    def __str__(self) -> str:
        return self.__repr__()

    @property
    def named_parameters(self) -> Dict[str, NDArray]:
        """
        Return a dictionary of the parameters of the network.

        Returns
        -------
        Dict[str, NDArray]
            Dictionary of the parameters of the network.
        """
        params = {
            f'layer_{n:d}': layer.named_parameters
            for (n, layer) in enumerate(self.layers)
            if isinstance(layer, Layer)
        }
        return params

    @property
    def n_params(self) -> int:
        """
        Return the number of parameters of the network.

        Returns
        -------
        int
            Number of parameters of the network.
        """
        n_params = 0
        for layer in self.layers:
            if isinstance(layer, Layer):
                n_params += layer.n_params
        return int(n_params)

    @property
    def summary(self) -> str:
        """
        Return a summary of the network.

        Returns
        -------
        str
            Summary of the network.
        """
        summary = [
            'Neural Network -- Parameter Summary',
            '---------------------------------------',
            '|  id  |    type    | no. parameters  |'
        ]
        for n, layer in enumerate(self.layers):
            summary.append(
                f'| {n+1: <4d} | {layer.type: <10s} | {layer.n_params: <15d} |'
            )
        summary.extend([
            '---------------------------------------',
            f'loss function: {self.criterion.type}',
            f'trainable parameters: {self.n_params:d}'
        ])

        return '\n'.join(summary)

    def submodel(self, stop: int = None, start: int = 0) -> 'Network':
        """
        Return a submodel of the network.

        Parameters
        ----------
        stop : int
            Stop index of the submodel.
        start : int
            Start index of the submodel.

        Returns
        -------
        Network
            Submodel of the network.
        """
        return Network(*self.layers[start:stop], criterion=None)

    def graph(self) -> graphviz.Digraph:
        """
        Return a graph representation of the network.

        Returns
        -------
        graphviz.Digraph
            Graph representation of the network.
        """
        G = graphviz.Digraph()

        # Add nodes
        G.node('I', 'Input')
        for n, layer in enumerate(self.layers):
            G.node(f'L{n}', f'{str(layer)}')
        if self.criterion is not None:
            G.node('output', f'{str(self.criterion)}')

        # Add edges
        G.edge('I', 'L0')
        for n in range(len(self.layers) - 1):
            G.edge(f'L{n}', f'L{n+1}')
        if self.criterion is not None:
            G.edge(f'L{n+1}', 'output')
        return G

    def __call__(self, X: NDArray, y: NDArray) -> NDArray:
        """
        Forward pass of the network.

        Parameters
        ----------
        X : NDArray
            Input data.
        y : NDArray
            Targets.

        Returns
        -------
        NDArray
            Output of the network.
        """
        return self.forward(X, y)

    def predict(self, X: NDArray) -> NDArray:
        """
        Predictions of the network.

        Parameters
        ----------
        X : NDArray
            Input data.

        Returns
        -------
        NDArray
            Predictions of the network.
        """
        # YOUR CODE HERE

    def forward(self, X: NDArray, y: NDArray) -> NDArray:
        """
        Forward pass of the network.

        Parameters
        ----------
        X : NDArray
            Input data.
        y : NDArray
            Targets.

        Returns
        -------
        NDArray
            Output of the network.
        """
        # YOUR CODE HERE

    def backward(self) -> None:
        """Backward pass of the network."""
        # YOUR CODE HERE

Let us explore the network we have just created.
This is often called MultiLayer Perceptron (MLP):

In [None]:
mlp = Network(Linear(5, 4),
              ReLU(),
              Linear(4, 3),
              ReLU(),
              Linear(3, 2),
              Identity(),
              criterion=MSELoss(),
              lr=0.0001)
mlp

As we have just created it, let us print a summary of the layers in the network: how many parameters can we expect to have?

In [None]:
print(mlp.summary)

We can even visualise the network graphically!

In [None]:
mlp.graph()

### The Dataset

Unfortunately, we are not done, yet: the NN is nothing without data!
Let us create a Python **iterator** capable of looping over the data.
Specifically, let us consider a dataset of **features** $X$ and **targets** $y$, split over multiple (mini-)batches.

**N.B.** a Python **iterator** is a class which implements, at least, the `__iter__` method. The `__len__` method enables the `len` function.

**N.B.** the dataset we create will be split across batches in this tutorial. However, it is fully loaded into memory from the beginning: can you see advantages/disadvantages of using mini-batch gradient descent even in this case?

In [20]:
class Dataset:
    """Dataset of tensors (in-memory)."""

    def __init__(self, X: NDArray, y: NDArray, batch_size: int) -> None:
        """
        Parameters
        ----------
        X : NDArray
            Features.
        y : NDArray
            Targets.
        batch_size : int
            Batch size.
        """
        self.X = X
        self.y = y
        self.batch_size = batch_size

        # Check if the batch size is valid
        if batch_size > X.shape[0]:
            # YOUR CODE HERE

        # Split the data into batches
        # YOUR CODE HERE
        self._features = # YOUR CODE HERE
        self._targets = # YOUR CODE HERE

    def __len__(self) -> int:
        """
        Return the number of batches.

        Returns
        -------
        int
            Number of batches.
        """
        return len(self._features)

    def __iter__(self) -> Iterator[Tuple[NDArray, NDArray]]:
        """
        Iterate over the batches.

        Returns
        -------
        Iterator[Tuple[NDArray, NDArray]]
            Iterator over the batches.
        """
        for f, t in zip(self._features, self._targets):
            yield f, t

In [None]:
# THIS IS A TEST CELL. DO NOT DELETE IT.

# Generate some data
y_pred = gen.normal(size=(125, 32))
y_true = gen.normal(size=(125, 2))

# Create the dataset
data = Dataset(y_pred, y_true, batch_size=16)
if len(data) != 125 // 16 + 1:
    display(Image('img/allegri_dipoco.gif', width=500))
    raise ValueError('The number of batches is not correct!')

data = Dataset(y_pred, y_true, batch_size=128)
if len(data) != 1:
    display(Image('img/allegri_giacca.gif', width=500))
    raise ValueError(
        'The number of batches is not correct (even though batch size is 128)!'
    )

# All tests passed successfully
print('All tests passed successfully!')
display(Image('img/allegri_calma.gif', width=500))

### The Trainer

Almost there!
We need something to execute the training/validation loop of the model.
In some software implementations, this is called a "trainer" and it is responsible of handling the training/validation logic, and the inference method.

Ideally, we would like our trainer to contain:

- the **training and validation loops** (for a single epoch, over the whole dataset split in batches),
- a **train and validation logic**, in order to loop over the whole dataset for multiple epochs,
- a **test and prediction logic** (i.e. something to evaluate the model over the independent test set, and something to simply output the predictions),
- an **export** function to export the trained model to file for later use and deployment.

In [22]:
class Trainer:
    """Trainer for the network."""

    def __init__(
        self,
        model: Network,
        train_data: Dataset,
        val_data: Dataset,
        epochs: int = 1,
    ) -> None:
        """
        Parameters
        ----------
        model : Network
            Model to train.
        train_data : Dataset
            Training data.
        val_data : Dataset
            Validation data.
        epochs : int
            Number of epochs.
        """
        self.model = model
        self.train_data = train_data
        self.val_data = val_data
        self.epochs = epochs

        # Save the losses
        self.train_loss = []
        self.val_loss = []

    def training_loop(self, epoch: int) -> float:
        """
        Training loop.

        Parameters
        ----------
        epoch : int
            Epoch number.

        Returns
        -------
        float
            Loss.

        Raises
        ------
        ValueError
            If the loss becomes NaN.
        """
        loss = []
        train = tqdm(self.train_data,
                     total=len(self.train_data),
                     desc=f'epoch {epoch:03d}',
                     ncols=79,
                     leave=True)
        for X, y in # YOUR CODE HERE:

            # Forward pass
            loss_step = # YOUR CODE HERE
            if np.isnan(loss_step):
                raise ValueError(
                    f'Some input produced NaN values during epoch {epoch:03d}!'
                )
            loss.append(loss_step)

            # Backward pass
            # YOUR CODE HERE

        # Compute the loss
        loss = # YOUR CODE HERE
        self.train_loss.append(loss)

        return loss

    def validation_loop(self, data: Dataset) -> float:
        """
        Validation loop.

        Parameters
        ----------
        data : Dataset
            Validation data.

        Returns
        -------
        float
            Loss.
        """
        loss = []
        val = tqdm(data, total=len(data), desc='validation', leave=True)
        for X, y in # YOUR CODE HERE:
            loss_step = # YOUR CODE HERE
            loss.append(loss_step)

        # Compute the loss
        loss = # YOUR CODE HERE
        self.val_loss.append(loss)

        return loss

    def train(self) -> None:
        """Train and validate the model."""
        self.train_loss = []
        self.val_loss = []
        for epoch in range(1, self.epochs + 1):
            _ = self.training_loop(epoch)
            _ = self.validation_loop(self.val_data)

    def validate(self) -> None:
        """Validate the model."""
        self.val_loss = []
        return self.validation_loop(self.val_data)

    def test(self, data: Dataset) -> None:
        """Test the model."""
        self.val_loss = []
        return self.validation_loop(data)

    def predict(self, X: NDArray) -> NDArray:
        """
        Predict using the trained model.

        Parameters
        ----------
        X : NDArray
            Input data.

        Returns
        -------
        NDArray
            Output of the model.
        """
        return # YOUR CODE HERE

    def export(self, path: str = 'model.joblib') -> None:
        """
        Export the trained model.
        """
        dump(self.model, path, compress=9)
        print(f'The model has been exported to {path}!')

## From Artificial Intelligence to String Theory

The example below is based **string theory**, and it has become a "standard" in the community.
First, let us introduce the problem, briefly.

String Theory (ST) is a candidate "theory of everything" capable of unifying the four fundamental interactions of nature: weak and strong nuclear forces, electromagnetism, and gravity.
In the usual framework, fields are defined over the theoretical construct of **point particle** $x$, a 0-dimensional mathematical object which travels on a 1-dimensional trajectory, the **worldline** $x = x(\tau)$ parametrized by the "time" $\tau$.
The main idea behind ST is the replacement of the point particle by a 1-dimensional object, called **string**, which forms a 2-dimensional trajectory, the **worldsheet** $x = x(\tau, \sigma)$ parametrized by time $\tau$ and the position on the string $\sigma$, when moving in spacetime.
Fields are then defined using this as "fundamental brick".
Observables (particles) can be defined similarly to the usual Quantum Field Theory (QFT) framework.
However, they enjoy some of the properties offered by ST.

This has some advantages (the 2D worldsheet theory is conformal, thus extremely symmetric and constrained), and some limitations.
For instance, in QFT it is assumed that the target spacetime is $1+3$-dimensional ($D = 4$).
In ST the dimension of spacetime is the result of a computation, and it is $D = 10$.
This can be verified in different ways (more or less elegant):

- in order to have a massless photon in the spectrum, the number of degrees of freedom (the "little group" of the symmetry group) must be such that $D = 10$,
- in order to have a conformal theory without anomalies (central charge), we must require $D = 10$,
- the request of nihilpotency of the BRST charge imposes $D = 10$,
- etc.

As a consequence, we found ourselves to be in a space with $1+9$ dimensions, only $3+1$ actually accessible at our energy.
That is, we do not have a probe with sufficient energy to develop the entire spacetime.
This can be modelled in the following way:

$$
\mathcal{M}^{1,9}
=
\mathcal{M}^{1,3}
\times
X_6.
$$

That is, the $1+9$-dimensional Minkowski space is, in fact, the product of the usual $1+3$-dimensional of "everyday physics", and a **compact** manifold $X_6$, where the remaining $6$ dimensions ($3$ in complex space $\mathbb{C}$) are hidden.
It goes without saying, that the "radius" of the compact manifold is extremely **small**.

These manifolds play a central role in ST.
They are build in order to have good **phenomenological** properties (too long to explain here), and must satisfy some requisites.
They were first theorized by the Italian mathematician Eugenio Calabi, and later proved to exist by the Chinese mathematician Shing-Tung Yau.
They are now known as **Calabi-Yau manifolds** (CY).


A good part of research in ST is on such manifolds, as they are crucial for building phenomenological models.
The different characteristics often depends on algebraic properties of the CYs.
In particular, we are often interested by their **topology**: there exist several **topological invariants** (scalars) which completely characterise the manifold (e.g. the Euler characteristic).

In what follows, we shall focus on the prediction of the **Hodge numbers** $h^{(1,1)}$ and $h^{(2,1)}$ of some particular CYs (in $3$ complex dimensions, thus CY 3-folds).
They represent the dimensions of the cohomology groups of the manifold (in a sense, the number of holes of different dimensions in the manifold).
Phenomenologically, they can be used to compute a number of quantities, such as the number of families of fermions in the spectrum of the theory.
The manifolds we consider are a special class of CYs, built as **intersections** of hypersurfaces in a product of complex **projective spaces**:

$$
\mathbb{CP}^{n_1}
\times
\mathbb{CP}^{n_2}
\times
\dots
\times
\mathbb{CP}^{n_m}.
$$

Each hypersurface is given as **homogeneous equation** with coordinates $Z^i$, $i = 0, \dots, n_k$, for $k = 1, \dots, m$.
Topologically speaking, the most interesting quantity to be computed are the degrees of these equations $a^i_k$.
By including $m$ projective spaces and $k$ equations, these can be rearranged into a matrix with integer entries:

$$
X
=
\begin{pmatrix}
a^1_1 & a^1_2 & \dots & a^1_k \\
a^2_1 & a^2_2 & \dots & a^2_k \\
\vdots & \vdots & \ddots & \vdots \\
a^m_1 & a^m_2 & \dots & a^m_k
\end{pmatrix}.
$$

These manifolds are called **Complete Intersections** CYs (CICYs).

We would like to use a NN architecture to predict **at the same time**:

$$
NN\, \colon\, \mathbb{N}^{m \times k} \to \mathbb{N} \times \mathbb{N},
\quad
X \mapsto NN(X) = (h^{(1, 1)}, h^{(2, 1)}).
$$

However, we do this by **regression**, as we are interested in finding a **function** capable of modeling the relation between the **configuration matrix** and the **Hodge numbers**, rather than a classifier, which would not generalise to unknown samples.

We first load the dataset using the `pandas` library, and we split it into training, validation and test sets.

In [None]:
df = pd.read_json('data/cicy3.json.gz', orient='index')

# Split into training and test sets
train = # YOUR CODE HERE
test = # YOUR CODE HERE
print('Train set:', train.shape)

# Split into validation and test sets
val = # YOUR CODE HERE
test = # YOUR CODE HERE
print('Validation set:', val.shape)
print('Test set:', test.shape)

We then need to arrange the features and the targets (labels) in the right format:

In [None]:
# Split features and targets
X_train = np.array(train['matrix'].to_list())
X_train = X_train.reshape(train.shape[0], -1)
y_train = train[['h11', 'h21']].values
print(f'Shape of train data: X -> {X_train.shape}, y -> {y_train.shape}')

in_features = X_train.shape[1]  # save the number of input features for later
out_features = y_train.shape[1]  # store the shape of the labels for later

X_val = np.array(val['matrix'].to_list())
X_val = X_val.reshape(val.shape[0], -1)
y_val = val[['h11', 'h21']].values
print(f'Shape of val data: X -> {X_val.shape}, y -> {y_val.shape}')

X_test = np.array(test['matrix'].to_list())
X_test = X_test.reshape(test.shape[0], -1)
y_test = test[['h11', 'h21']].values
print(f'Shape of test data: X -> {X_test.shape}, y -> {y_test.shape}')

Let us try to visualise the configuration matrices of a few samples:

In [None]:
fig, ax = plt.subplots(ncols=3, figsize=(18, 5), layout='constrained')

ax[0].imshow(X_train[0].reshape(12, 15))
ax[0].set_title(
    f'$h^{{(1,1)}} = ${y_train[0,0]} | $h^{{(2,1)}} = ${y_train[0,1]}')
ax[1].imshow(X_train[1].reshape(12, 15))
ax[1].set_title(
    f'$h^{{(1,1)}} = ${y_train[1,0]} | $h^{{(2,1)}} = ${y_train[1,1]}')
ax[2].imshow(X_train[2].reshape(12, 15))
ax[2].set_title(
    f'$h^{{(1,1)}} = ${y_train[2,0]} | $h^{{(2,1)}} = ${y_train[2,1]}')

fig.suptitle(
    'Complete Intersection Calaby-Yau Manifolds | Configuration Matrices')
plt.show()

**N.B.**: notice that the NN architecture is "fully connected" and accepts only tabular inputs.
We shall predict the Hodge numbers using the vectorised version of the images, built by concatenating the lines of the configuration matrices one after the other.

Before moving on, we normalise the features, as it is always a good idea to help the machine learning algorithms by having all inputs of the same magnitude.

**N.B.**

All `scikit-learn` objects have roughly the same interface:

- `<object>.fit(X, y)` will fit the `<object>` on features $X$ and targets $y$,
- `<object>.transform(y)` will aplly the transformation fitted before on targets $y$ (the `<object>` must have been fitted),
- `<object>.fit_transform(X, y)` will first call the `fit` method, then apply the transformation.

In [26]:
# Scale the inputs
scaler = StandardScaler()  # play around with other methods!
X_train = # YOUR CODE HERE
X_val = # YOUR CODE HERE
X_test = # YOUR CODE HERE

We then build the datasets using the classes we created earlier:

In [None]:
train = # YOUR CODE HERE
print(f'Lenght of train data: {len(train)} batches')
val = # YOUR CODE HERE
print(f'Lenght of val data: {len(val)} batches')
test = # YOUR CODE HERE
print(f'Lenght of test data: {len(test)} batches')

### Regression Task

We build the NN using the `Network` class.
We choose a regression loss function, as previously anticipated:

In [None]:
layers = [  # play around with other architectures
    Linear(in_features, 128),
    ReLU(),
    Linear(128, 64),
    ReLU(),
    Linear(64, out_features),
    # Identity()
]

model = Network(*layers, criterion= # YOUR CODE HERE
, lr=0.001)
print(model.summary)

We are ready to train the network! 
Let us define a suitable number of epochs, and train the network using the `Trainer` class:

In [None]:
n_epochs = 150  # play around with this!
trainer = Trainer(model, train, val, epochs=n_epochs)
trainer.train()

After training, it is usually good practice to take a look at the training and validation losses.
What can be said about the model and its training?

In [None]:
fig, ax = plt.subplots(figsize=(6, 5), layout='constrained')

# Plot the loss
ax.plot(np.arange(1, n_epochs + 1, dtype='int'),
        trainer.train_loss,
        'k-',
        label='train')
ax.plot(np.arange(1, n_epochs + 1, dtype='int'),
        trainer.val_loss,
        'r--',
        label='val')

ax.set_xlabel('epochs')
ax.set_ylabel('loss')
ax.set_yscale('log')
ax.set_title('Training and validation loss')
ax.legend()
plt.show()

And finally, let us visualise and evaluate our model.
First, consider a classical **QQ-plot** of the predictions, to test the "alignment" of the predictions with the respective ground truths.

In [None]:
# Data
(h11_train, h21_train) = y_train.T
(h11_val, h21_val) = y_val.T

# Predictions
(h11_train_p, h21_train_p) = trainer.predict(X_train).T
(h11_val_p, h21_val_p) = trainer.predict(X_val).T

# Quantiles
h11_train_qq = np.quantile(h11_train, np.linspace(0, 1, num=1000))
h11_train_p_qq = np.quantile(h11_train_p, np.linspace(0, 1, num=1000))

h21_train_qq = np.quantile(h21_train, np.linspace(0, 1, num=1000))
h21_train_p_qq = np.quantile(h21_train_p, np.linspace(0, 1, num=1000))

h11_val_qq = np.quantile(h11_val, np.linspace(0, 1, num=1000))
h11_val_p_qq = np.quantile(h11_val_p, np.linspace(0, 1, num=1000))

h21_val_qq = np.quantile(h21_val, np.linspace(0, 1, num=1000))
h21_val_p_qq = np.quantile(h21_val_p, np.linspace(0, 1, num=1000))

# QQ Plots
fig, ax = plt.subplots(ncols=2, figsize=(12, 5), layout='constrained')

ax[0].scatter(h11_train_qq, h11_train_p_qq, alpha=0.25, color='k', marker='o')
ax[0].plot(h11_train_qq, h11_train_qq, 'r--')
ax[0].set_title('$h^{(1,1)}$')
ax[0].set_xlabel('ground truth (quantiles)')
ax[0].set_ylabel('prediction (quantiles)')

ax[1].scatter(h21_train_qq, h21_train_p_qq, alpha=0.25, color='k', marker='o')
ax[1].plot(h21_train_qq, h21_train_qq, 'r--')
ax[1].set_title('$h^{(2,1)}$')
ax[1].set_xlabel('ground truth (quantiles)')
ax[1].set_ylabel('prediction (quantiles)')

fig.suptitle('Training Data')

plt.show()

fig, ax = plt.subplots(ncols=2, figsize=(12, 5), layout='constrained')

ax[0].scatter(h11_val_qq, h11_val_p_qq, alpha=0.25, color='k', marker='o')
ax[0].plot(h11_val_qq, h11_val_qq, 'r--')
ax[0].set_title('$h^{(1,1)}$')
ax[0].set_xlabel('ground truth (quantiles)')
ax[0].set_ylabel('prediction (quantiles)')

ax[1].scatter(h21_val_qq, h21_val_p_qq, alpha=0.25, color='k', marker='o')
ax[1].plot(h21_val_qq, h21_val_qq, 'r--')
ax[1].set_title('$h^{(2,1)}$')
ax[1].set_xlabel('ground truth (quantiles)')
ax[1].set_ylabel('prediction (quantiles)')

fig.suptitle('Validation Data')

plt.show()

Then, evaluate the model by computing its accuracy.
Notice that, even though the network was trained for a regression task, the predicted observables are ultimately **integers**.
We can **round** the results and compute the metrics _as if_ we were dealing with a classification task: let us use classical classification metrics to assess the goodness of fit.

In [None]:
# Round the predictions
h11_train_p = np.round(h11_train_p, 0).astype('int')
h21_train_p = np.round(h21_train_p, 0).astype('int')

h11_val_p = np.round(h11_val_p, 0).astype('int')
h21_val_p = np.round(h21_val_p, 0).astype('int')

# Compute the metrics
# YOUR CODE HERE

message = [
    'TRAINING',
    '--------',
    f'  > accuracy : h11 = {h11_acc_train:.1%}  | h21 = {h21_acc_train:.1%}',
    f'  > precision: h11 = {h11_prec_train:.1%} | h21 = {h21_prec_train:.1%}',
    f'  > recall   : h11 = {h11_rec_train:.1%}  | h21 = {h21_rec_train:.1%}',
    f'  > f1 score : h11 = {h11_f1_train:.1%}   | h21 = {h21_f1_train:.1%}',
    '',
    'VALIDATION',
    '---------',
    f'  > accuracy : h11 = {h11_acc_val:.1%}  | h21 = {h21_acc_val:.1%}',
    f'  > precision: h11 = {h11_prec_val:.1%} | h21 = {h21_prec_val:.1%}',
    f'  > recall   : h11 = {h11_rec_val:.1%}  | h21 = {h21_rec_val:.1%}',
    f'  > f1 score : h11 = {h11_f1_val:.1%}   | h21 = {h21_f1_val:.1%}',
]
print('\n'.join(message))

Apparently, results are not optimal, but we are peaking up something.
Let us dig deeper to see if we can improve the model architecture.

Final step, let us take a look at the **feature maps** (the intermediate layers) of the model.
Is a pattern starting to emerge?

First, extract a submodel of the network by stopping at an intermediate layer (you can play a bit with this parameter).

In [None]:
# Take the last linear layer
submodel = model.submodel(stop=5)
submodel.graph()

Then, visualise the vectors output by the intermediate layer using **unsupervised** methods for visualisation:

- as the feature map is in high dimensional space, do you know of any unsupervised algorithm for dimensionality reduction?
- as some structure might emerge, do you know any good algorithm which could help us see whether data are grouped together?

In [None]:
feature_maps = submodel.predict(X_train)

# Plot the feature maps
kmeans = KMeans(n_clusters=10)
kmeans_labels = kmeans.fit_predict(feature_maps)

pca = PCA(n_components=2)
feature_maps_pca = pca.fit_transform(feature_maps)

fig, ax = plt.subplots(figsize=(6, 5), layout='constrained')

ax.scatter(  # YOUR CODE HERE,
    # YOUR CODE HERE,
    alpha=0.25,
    c=kmeans_labels,
    cmap='tab10',
    marker='o')

ax.set_xlabel(
    f'1st principal component ({pca.explained_variance_ratio_[0]:.1%})')
ax.set_ylabel(
    f'2nd principal component ({pca.explained_variance_ratio_[1]:.1%})')
ax.set_title('Feature Maps')

plt.show()

Eventually, let us simulate a "deployment" into "production" of our model, and test it on unknown samples in the test set:

1. save the model architecture and parameters to file, to be exported to a different machine (not in this case),
2. load the model from file, and make predictions on the test set,
3. as in this case ground truths are available, compute the metrics and print them.

In [None]:
# Save the model
dump(model, 'model.joblib')

# Reload the model
model = load('model.joblib')
h11_test_p, h21_test_p = model.predict(X_test).T
h11_test, h21_test = y_test.T

# Round the predictions
# YOUR CODE HERE

# Compute the metrics
# YOUR CODE HERE

message = [
    'TEST',
    '----',
    f'  > accuracy : h11 = {h11_acc_test:.1%}  | h21 = {h21_acc_test:.1%}',
    f'  > precision: h11 = {h11_prec_test:.1%} | h21 = {h21_prec_test:.1%}',
    f'  > recall   : h11 = {h11_rec_test:.1%}  | h21 = {h21_rec_test:.1%}',
    f'  > f1 score : h11 = {h11_f1_test:.1%}   | h21 = {h21_f1_test:.1%}',
]
print('\n'.join(message))

### Classification Task

Let us change the point of view and try a _classification_ of $h^{(1, 1)}$ based on the configuration matrix.
This is technically a simpler task, which depends on a strong _prior knowledge_ of the range of variation of the Hodge numbers (no extrapolation possible during inference).

Let us build the class labels:

In [36]:
y_train_class = y_train[..., 0]  # take only h11
y_val_class = # YOUR CODE HERE
y_test_class = # YOUR CODE HERE

How many classes are there in the dataset?

In [37]:
n_classes = # YOUR CODE HERE

Then build the datasets:

In [None]:
train = # YOUR CODE HERE
print(f'Lenght of train data: {len(train)} batches')
val = # YOUR CODE HERE
print(f'Lenght of val data: {len(val)} batches')
test = # YOUR CODE HERE
print(f'Lenght of test data: {len(test)} batches')

We then construct the architecture.
How many neurons should we insert in the last linear layer?

In [None]:
layers = [
    Linear(in_features, 128),
    ReLU(),
    Linear(128, 64),
    ReLU(),
    Linear(64, 32),
    ReLU(),
    Linear(32, n_classes),
]

model = Network(*layers,
                criterion=# YOUR CODE HERE,
                lr=0.001)
print(model.summary)

We are ready to train the network! 
Let us define a suitable number of epochs, and train the network using the `Trainer` class:

In [None]:
n_epochs = 600
trainer = Trainer(model, train, val, epochs=n_epochs)
trainer.train()

As in the previous case, let us take a look at the evolution of the loss function:


In [None]:
fig, ax = plt.subplots(figsize=(6, 5), layout='constrained')

# Plot the loss
ax.plot(np.arange(1, n_epochs + 1, dtype='int'),
        trainer.train_loss,
        'k-',
        label='train')
ax.plot(np.arange(1, n_epochs + 1, dtype='int'),
        trainer.val_loss,
        'r--',
        label='val')

ax.set_xlabel('epochs')
ax.set_ylabel('loss')
ax.set_yscale('log')
ax.set_title('Training and validation loss')
ax.legend()
plt.show()

Let us then visualise the predicted and real distributions using the **QQ-plots**.
Rember that predictions are **vectors** of probabilities: how can we recover the classes?

In [None]:
# Data
h11_train = y_train_class.ravel()
h11_val = y_val_class.ravel()

# Predictions
h11_train_p = np.argmax(trainer.predict(X_train), axis=1)
h11_val_p = np.argmax(trainer.predict(X_val), axis=1)

# Quantiles
h11_train_qq = np.quantile(h11_train, np.linspace(0, 1, num=1000))
h11_train_p_qq = np.quantile(h11_train_p, np.linspace(0, 1, num=1000))

h11_val_qq = np.quantile(h11_val, np.linspace(0, 1, num=1000))
h11_val_p_qq = np.quantile(h11_val_p, np.linspace(0, 1, num=1000))

# QQ Plots
fig, ax = plt.subplots(ncols=2, figsize=(12, 5), layout='constrained')

ax[0].scatter(h11_train_qq, h11_train_p_qq, alpha=0.25, color='k', marker='o')
ax[0].plot(h11_train_qq, h11_train_qq, 'r--')
ax[0].set_title('$h^{(1,1)}$ (training data)')
ax[0].set_xlabel('ground truth (quantiles)')
ax[0].set_ylabel('prediction (quantiles)')

ax[1].scatter(h11_val_qq, h11_val_p_qq, alpha=0.25, color='k', marker='o')
ax[1].plot(h11_val_qq, h11_val_qq, 'r--')
ax[1].set_title('$h^{(1,1)}$ (validation data)')
ax[1].set_xlabel('ground truth (quantiles)')
ax[1].set_ylabel('prediction (quantiles)')

fig.suptitle('QQ plots')
plt.show()

We can then proceed to compute the metrics:

In [None]:
# Compute the metrics
# YOUR CODE HERE

message = [
    'TRAINING',
    '--------',
    f'  > accuracy : h11 = {h11_acc_train:.1%}',
    f'  > precision: h11 = {h11_prec_train:.1%}',
    f'  > recall   : h11 = {h11_rec_train:.1%}',
    f'  > f1 score : h11 = {h11_f1_train:.1%}',
    '',
    'VALIDATION',
    '---------',
    f'  > accuracy : h11 = {h11_acc_val:.1%}',
    f'  > precision: h11 = {h11_prec_val:.1%}',
    f'  > recall   : h11 = {h11_rec_val:.1%}',
    f'  > f1 score : h11 = {h11_f1_val:.1%}',
]
print('\n'.join(message))

Let us then take a look at the feature maps, as done before for regression:

In [None]:
# Take the last linear layer
submodel = model.submodel(stop=7)
submodel.graph()

Then, visualise the vectors output by the intermediate layer using **unsupervised** methods for visualisation:

- as the feature map is in high dimensional space, do you know of any unsupervised algorithm for dimensionality reduction?
- as some structure might emerge, do you know any good algorithm which could help us see whether data are grouped together?

In [None]:
feature_maps = submodel.predict(X_train)

# Plot the feature maps
kmeans = KMeans(n_clusters=10)
kmeans_labels = kmeans.fit_predict(feature_maps)

pca = PCA(n_components=2)
feature_maps_pca = pca.fit_transform(feature_maps)

fig, ax = plt.subplots(figsize=(6, 5), layout='constrained')

ax.scatter(feature_maps_pca[..., 0],
           feature_maps_pca[..., 1],
           alpha=0.25,
           c=kmeans_labels,
           cmap='tab20',
           marker='o')

ax.set_xlabel(
    f'1st principal component ({pca.explained_variance_ratio_[0]:.1%})')
ax.set_ylabel(
    f'2nd principal component ({pca.explained_variance_ratio_[1]:.1%})')
ax.set_title('Feature Maps')

plt.show()

Eventually, let us simulate a "deployment" into "production" of our model, and test it on unknown samples in the test set:

1. save the model architecture and parameters to file, to be exported to a different machine (not in this case),
2. load the model from file, and make predictions on the test set,
3. as in this case ground truths are available, compute the metrics and print them.

In [None]:
# Save the model
dump(model, 'model.joblib')

# Reload the model
model = load('model.joblib')
h11_test_p = # YOUR CODE HERE
h11_test = y_test_class

# Compute the metrics
# YOUR CODE HERE

message = [
    'TEST',
    '----',
    f'  > accuracy : h11 = {h11_acc_test:.1%}',
    f'  > precision: h11 = {h11_prec_test:.1%}',
    f'  > recall   : h11 = {h11_rec_test:.1%}',
    f'  > f1 score : h11 = {h11_f1_test:.1%}',
]
print('\n'.join(message))