# Objectif :

Dans ce devoir, vous allez implémenter une classe `DDPM` sur le dataset MNIST en utilisant PyTorch selon les directives. L'objectif est de minimiser la fonction de perte et d'entraîner le modèle pour générer des images MNIST.

Les classes `Train` et `UNet` sont déjà implémentées pour vous. Vous devez implémenter la classe `DDPM` (voir les détails ci-dessous). Les images générées par le modèle seront automatiquement affichées conformément à l'implémentation de la classe `Trainer`. Assurez-vous que les images générées sont affichées dans la sortie, cela sera évalué.

Note :
- **Implémentation de la classe DDPM (20 points).**
- **Entraînement du modèle pour générer des images MNIST raisonnables en 20 époques (10 points).**
- **Rédigez un rapport décrivant les exemples d'images générées par chaque période (10 points). Veuillez noter que la fonction pour générer l'image est déjà fournie.**

---
Veuillez NE PAS changer le code fourni, ajoutez uniquement votre propre code où indiqué. Il est recommandé d'**utiliser une session CPU pour déboguer** lorsque le GPU n'est pas nécessaire puisque Colab ne donne que 12 heures d'accès GPU gratuit à la fois. Si vous utilisez toutes les ressources GPU, vous pouvez envisager d'utiliser les ressources GPU de Kaggle. Merci et bonne chance !

# Configuration prédéterminée et fonctions données (pas besoin de changer)

In [None]:
!pip install labml_nn labml labml_helpers --no-deps
import torch
import torch.utils.data
import torchvision
from torch import nn
from labml_nn.diffusion.ddpm.unet import UNet
from typing import Tuple, Optional
import torch.nn.functional as F
from tqdm import tqdm
from easydict import EasyDict
import matplotlib.pyplot as plt
from torch.cuda.amp import GradScaler, autocast

args = {
    "image_channels": 1,  # Number of channels in the image. 3 for RGB.
    "image_size": 32,  # Image size
    "n_channels": 64,  # Number of channels in the initial feature map
    "channel_multipliers": [
        1,
        2,
        2,
        4,
    ],  # The list of channel numbers at each resolution.
    "is_attention": [
        False,
        False,
        False,
        True,
    ],  # The list of booleans for attention at each resolution
    "n_steps": 1000,  # Number of time steps T
    "nb_save": 5,  # Number of images to save
    "batch_size": 256,  # Batch size
    "n_samples": 16,  # Number of samples to generate
    "learning_rate": 2e-5,  # Learning rate
    "epochs": 20,  # Number of training epochs
    "device": "cuda" if torch.cuda.is_available() else "cpu",  # Device
    "fp16_precision": False
}
args = EasyDict(args)

In [None]:
class MNISTDataset(torchvision.datasets.MNIST):
    def __init__(self):
        transform = torchvision.transforms.Compose(
            [
                torchvision.transforms.Resize(args.image_size),
                torchvision.transforms.ToTensor(),
            ]
        )

        super().__init__(
            ".", train=True, download=True, transform=transform
        )

    def __getitem__(self, item):
        return super().__getitem__(item)[0]

In [None]:
class Trainer:
    def __init__(self, args, DenoiseDiffusion):

        self.eps_model = UNet(
            image_channels=args.image_channels,
            n_channels=args.n_channels,
            ch_mults=args.channel_multipliers,
            is_attn=args.is_attention,
        ).to(args.device)

        self.diffusion = DenoiseDiffusion(
            eps_model=self.eps_model,
            n_steps=args.n_steps,
            device=args.device,
        )

        self.optimizer = torch.optim.Adam(
            self.eps_model.parameters(), lr=args.learning_rate
        )
        self.args = args

    def train_a_round(self, dataloader, scaler):
        for data in dataloader:
            # Move data to device
            data = data.to(args.device)

            # Calculate the loss
            with autocast(enabled=self.args.fp16_precision):
                loss = self.diffusion.loss(data)
            # Zero gradients
            self.optimizer.zero_grad()
            # Backward pass
            scaler.scale(loss).backward()
            scaler.step(self.optimizer)
            scaler.update()

    def run_in_a_row(self, dataloader):
        scaler = GradScaler(enabled=self.args.fp16_precision)
        for current_epoch in tqdm(range(self.args.epochs)):
            self.current_epoch = current_epoch
            self.train_a_round(dataloader, scaler)
            self.sample()

    def sample(self):
        with torch.no_grad():
            # $x_T \sim p(x_T) = \mathcal{N}(x_T; \mathbf{0}, \mathbf{I})$
            x = torch.randn(
                [
                    self.args.n_samples,
                    self.args.image_channels,
                    self.args.image_size,
                    self.args.image_size,
                ],
                device=self.args.device,
            )
            if self.args.nb_save is not None:
                saving_steps = [self.args["n_steps"] - 1]
            # Remove noise for $T$ steps
            for t_ in tqdm(range(self.args.n_steps)):
                # $t$
                t = self.args.n_steps - t_ - 1
                # Sample from $\textcolor{lightgreen}{p_\theta}(x_{t-1}|x_t)$
                x = self.diffusion.p_sample(
                    x, x.new_full((self.args.n_samples,), t, dtype=torch.long)
                )
                if self.args.nb_save is not None and t_ in saving_steps:
                    print(f"Showing/saving samples from epoch {self.current_epoch}")
                    show_save(
                        x,
                        show=True,
                        save=True,
                        file_name=f"./ddpm_plots/epoch_{self.current_epoch}_sample_{t_}.png",
                    )
        return x


def show_save(img_tensor, show=True, save=True, file_name="sample.png"):
    fig, axs = plt.subplots(3, 3, figsize=(10, 10))  # Create a 4x4 grid of subplots
    assert img_tensor.shape[0] >= 9, "Number of images should be at least 9"
    img_tensor = img_tensor[:9]
    for i, ax in enumerate(axs.flat):
        # Remove the channel dimension and convert to numpy
        img = img_tensor[i].squeeze().cpu().numpy()

        ax.imshow(img, cmap="gray")  # Display the image in grayscale
        ax.axis("off")  # Hide the axis

    plt.tight_layout()
    if save:
        plt.savefig(file_name)
    if show:
        plt.show()
    plt.close(fig)

# Terminer l'implémentation du modèle DenoiseDiffusion

Selon ce qui a été couvert dans le cours ([diapositives](https://www.dropbox.com/s/0gu91rovro71q90/Diffusion.pdf?dl=0)),

Le `Trainer`, le `dataset` et le `UNet` sont donnés.

Nous initialisons ${\epsilon_\theta}(x_t, t)$, $\beta_1, \dots, \beta_T$ (programme de variance augmentant linéairement), $\alpha_t = 1 - \beta_t$, $\bar\alpha_t = \prod_{s=1}^t \alpha_s$, $\sigma^2 = \beta$
```python
class DenoiseDiffusion:
    def __init__(self, eps_model: nn.Module, n_steps: int, device: torch.device):
        super().__init__()
        self.eps_model = eps_model
        self.beta = torch.linspace(0.0001, 0.02, n_steps).to(device)
        self.alpha = 1.0 - self.beta
        self.alpha_bar = torch.cumprod(self.alpha, dim=0)
        self.n_steps = n_steps
        self.sigma2 = self.beta
```

## q_xt_x0
Nous devons implémenter la fonction :
```python
    def q_xt_x0(
        self, x0: torch.Tensor, t: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        ...
        return mean, var
```
$$
\begin{align}
q(x_t|x_0) &= \mathcal{N} \Big(x_t; \sqrt{\bar\alpha_t} x_0, (1-\bar\alpha_t) \mathbf{I} \Big)
\end{align}
$$

Conseil : utilisez la fonction gather donnée. En savoir plus sur `gather()` [ici](https://pytorch.org/docs/stable/generated/torch.gather.html).

## q_sample

Nous devons implémenter la fonction pour obtenir des échantillons de $q(x_t|x_0)$.

\begin{align}
q(x_t|x_0) &= \mathcal{N} \Big(x_t; \sqrt{\bar\alpha_t} x_0, (1-\bar\alpha_t) \mathbf{I} \Big)
\end{align}

Indice : l'échantillonnage à partir de $\mathcal{N} \Big(\mu, \sigma^2\Big)$ est identique à l'échantillonnage à partir de $\mathcal{N} \Big(0, I\Big)$, puis mettre à l'échelle et décaler.

## p_sample
Nous devons implémenter la fonction pour obtenir des échantillons de ${p_\theta}(x_{t-1}|x_t)$

\begin{align}
{p_\theta}(x_{t-1} | x_t) &= \mathcal{N}\big(x_{t-1};
{\mu_\theta}(x_t, t), \sigma_t^2 \mathbf{I} \big) \\
{\mu_\theta}(x_t, t)
  &= \frac{1}{\sqrt{\alpha_t}} \Big(x_t -
    \frac{\beta_t}{\sqrt{1-\bar\alpha_t}}{\epsilon_\theta}(x_t, t) \Big)
\end{align}

* `beta` est défini comme $1-\alpha_t$  
* `eps_coef` est défini comme $\frac{\beta}{\sqrt{1-\bar\alpha_t}}$ * `mu_theta` est défini comme $\frac{1}{\sqrt{\alpha_t}}$
* `mu_theta` est défini comme $\frac{1}{\sqrt{\alpha_t}} \Big(x_t -\frac{\beta_t}{\sqrt{1-\bar\alpha_t}}\epsilon_\theta(x_t, t) \Big)$
* `var` est défini comme $\sigma_t^2 \mathbf{I}$

## loss
Nous devons implémenter la fonction pour obtenir la perte :
$$L(\theta) = \mathbb{E}_{t,x_0, \epsilon} \Bigg[ \bigg\Vert
\epsilon - {\epsilon_\theta}(\sqrt{\bar\alpha_t} x_0 + \sqrt{1-\bar\alpha_t}\epsilon, t)
\bigg\Vert^2 \Bigg]$$

où `x_t` est échantillonné à partir de $q(x_t|x_0)$ qui est donné par $\sqrt{\bar\alpha_t} x_0 + \sqrt{1-\bar\alpha_t}\epsilon$


In [None]:
class DenoiseDiffusion:
    def __init__(self, eps_model: nn.Module, n_steps: int, device: torch.device):
        super().__init__()
        self.eps_model = eps_model
        self.beta = torch.linspace(0.0001, 0.02, n_steps).to(device)
        self.alpha = 1.0 - self.beta
        self.alpha_bar = torch.cumprod(self.alpha, dim=0)
        self.n_steps = n_steps
        self.sigma2 = self.beta

    def gather(self, c: torch.Tensor, t: torch.Tensor):
        c_ = c.gather(-1, t)
        return c_.reshape(-1, 1, 1, 1)

    def q_xt_x0(
        self, x0: torch.Tensor, t: torch.Tensor
    ) -> Tuple[torch.Tensor, torch.Tensor]:
        mean = torch.sqrt(self.gather(self.alpha_bar, t)) * x0
        var = 1 - self.gather(self.alpha_bar, t)
        return mean, var

    def q_sample(
        self, x0: torch.Tensor, t: torch.Tensor, eps: Optional[torch.Tensor] = None
    ):
        if eps is None:
            eps = torch.randn_like(x0)
        mean, var = self.q_xt_x0(x0, t)
        return mean + torch.sqrt(var) * eps

    def p_sample(self, xt: torch.Tensor, t: torch.Tensor):
        eps_theta = self.eps_model(xt, t)
        alpha_bar = self.gather(self.alpha_bar, t)
        alpha = self.gather(self.alpha, t)
        beta = 1 - alpha
        eps_coef = beta / torch.sqrt(1 - alpha_bar)
        mu_theta = (xt - eps_coef * eps_theta) / torch.sqrt(alpha)
        var = self.sigma2 * torch.eye(xt.shape[1])
        eps = torch.randn(xt.shape, device=xt.device)
        sample = mu_theta + torch.sqrt(self.gather(self.sigma2, t)) * eps
        return sample

    def loss(self, x0: torch.Tensor, noise: Optional[torch.Tensor] = None):
        batch_size = x0.shape[0]
        t = torch.randint(
            0, self.n_steps, (batch_size,), device=x0.device, dtype=torch.long
        )
        if noise is None:
            noise = torch.randn_like(x0)
        xt = self.q_sample(x0, t, eps=noise)
        eps_theta = self.eps_model(xt, t)
        loss = F.mse_loss(noise, eps_theta)
        return loss


# Commencez l'entraînement une fois que vous avez fini de remplir le code ci-dessus
Temps estimé : Environ `400s` pour chaque époque (`20 époques` au total), si vous ne changez pas les paramètres de configuration. Aucune logique de sauvegarde des points de contrôle du modèle n'est implémentée. N'hésitez pas à l'implémenter si vous en avez besoin. Il y aura des échantillons affichés et sauvegardés (en images `.png`) pendant l'entraînement pour chaque époque. Vous devriez pouvoir trouver les images sauvegardées dans les `Fichiers` sur le côté gauche si vous utilisez Google Colab.

Remarque : `20 époques` au total est juste un paramètre sûr pour générer des images de style MNIST. Normalement, cela devrait commencer à générer des images interprétables autour de `8 époques`. Si vous ne voyez pas cela, il peut y avoir quelque chose de mal avec votre implémentation. Veuillez vérifier votre code avant d'essayer d'avoir plus d'époques d'entraînement. Merci !

In [None]:
trainer = Trainer(args, DenoiseDiffusion)
dataloader = torch.utils.data.DataLoader(
    MNISTDataset(),
    batch_size=args.batch_size,
    shuffle=True,
    drop_last=True,
    num_workers=4,
    pin_memory=True,
)
trainer.run_in_a_row(dataloader)