# Démo DDPM

## Definition

Les diffusions modèles (ici [DDPM](https://arxiv.org/abs/2006.11239) sont une branche d'algorithmes utilisés pour la génération de données.

![](https://learnopencv.com/wp-content/uploads/2023/01/diffusion-models-forwardbackward_process_ddpm.png)

Le principe est simple, il y a un premier processus de diffusion consistant à ajouter progressivement du bruit à la donnée. Ce processus de diffusion consiste en une chaine de markov où il est possible d'obtenir la donnée à t + 1 (donc un peu plus bruitée) à partir de la donnée à un temps t :
![](https://learnopencv.com/wp-content/uploads/2023/02/denoising-diffusion-probabilistic-models_forward_diffusion_equations_1.png)

où les betas représentent la variance à un temps t, et sont croissants linéairement sur 1000 étapes de 1e-4 (beta 0) à 2e-2 (beta 1000).

Le tout donne en code (avec PyTorch) :



In [None]:
from PIL import Image
import torch as th
from torch import nn
from torchvision.transforms.functional import to_tensor
import matplotlib.pyplot as plt
from tqdm import tqdm
from typing import Literal

In [None]:
!wget https://media.cnn.com/api/v1/images/stellar/prod/190430171751-mona-lisa.jpg

x_0 = to_tensor(Image.open("./190430171751-mona-lisa.jpg"))

In [None]:
T = 100
beta_1 = 1e-3
beta_t = 2e-1

betas = th.linspace(beta_1, beta_t, steps=T)

In [None]:
def q_step(x_t_prev: th.Tensor, t: int) -> th.Tensor:
    z = th.randn_like(x_t_prev)
    return (1 - betas[t]) * x_t_prev + z * betas[t]

In [None]:
x_1 = q_step(x_0, 1)

In [None]:
x_0.size(), x_1.size()

In [None]:
plt.imshow(x_0.permute(1, 2, 0))

In [None]:
plt.imshow(x_1.permute(1, 2, 0))

In [None]:
x_t_list = [x_0]

for t in tqdm(range(1, T)):
    x_t_list.append(q_step(x_t_list[-1], t))

In [None]:
plt.imshow(x_t_list[25].permute(1, 2, 0))

In [None]:
plt.imshow(x_t_list[40].permute(1, 2, 0))

In [None]:
plt.imshow(x_t_list[-1].permute(1, 2, 0))

3 secondes pour "diffuser" une image, c'est trop ! Les auteurs proposent une simplification permettant d'obtenir n'import quel x (de t = 1 à t = T) à partir de la donnée originelle, le x à t = 0 :

![](https://miro.medium.com/v2/resize:fit:660/1*SRUnVsytTzuCWLvu7tA4gA.png)

Ce qui donne en code :

In [None]:
alphas = 1 - betas
alphas_cum_prod = th.cumprod(alphas, dim=0)

In [None]:
def q_sample(x_0: th.Tensor, t: int) -> th.Tensor:
    z = th.randn_like(x_0)
    return th.sqrt(alphas_cum_prod[t]) * x_0 + (1 - alphas_cum_prod[t]) * z

In [None]:
x_10 = q_sample(x_0, 10)
plt.matshow(x_10.permute(1, 2, 0))

In [None]:
x_30 = q_sample(x_0, 30)
plt.matshow(x_30.permute(1, 2, 0))

In [None]:
x_70 = q_sample(x_0, 70)
plt.matshow(x_70.permute(1, 2, 0))

## Reverse process - denoising process

C'est ok pour le processus de diffusion, qu'en est-il pour le coeur du sujet : le processus inverse aka le dé-bruitage ?

Il s'agit aussi d'une chaine de markov : à une étape t, il faut prédire la moyenne et la matrice de covariance qui ont servi à ajouter le bruit à l'étape précédente (pour passer de t - 1 à t) :

![](https://encrypted-tbn0.gstatic.com/images?q=tbn:ANd9GcRCZCpFnGkY_TCOPvvy-W3rOBuKR-ZPTbx6Pg&usqp=CAU)

Le modèle reçoit deux paramètres :
- la donnée bruitée
- l'indice dans la chaine de markov

Les auteurs simplifient le tout (en rapport au processus de diffusion / bruitage amélioré) : *
- on oublie la matrice de covariance du fait que la variance sera constante (matrice identité avec comme facteur beta)
- on ne prédit plus la moyenne de la distribution normale mais directement le bruit

Le tout donne en algorithme :

![](https://huggingface.co/blog/assets/78_annotated-diffusion/training.png)

## U-Net architecture

Il nous faut une architecture de réseau de neurones qui puisse à partir d'une image, produire une image de mêmes dimensions mais dans un espace de canaux / pixels / couleurs différent. Ici l'espace à prédire pour les pixels sera le bruit qui a été ajouté.

L'architecture U-Net sera parfaite : elle a fait ses preuves dans le biomédical pour de la segmentation d'images (plus généralement passer dans un autre espace de couleurs / canaux / pixels). Elle consiste en deux parties :
- un encodeur
- un décodeur
Ces deux parties sont liées au niveau de la sortie de l'encodeur et de l'entrée du décodeur mais aussi par des connexions directes entre les couches de l'encodeur vers le décodeur :

![](https://miro.medium.com/v2/resize:fit:1400/1*f7YOaE4TWubwaFF7Z1fzNw.png)

### L'élément de base : la convolution

![](https://upload.wikimedia.org/wikipedia/commons/0/04/Convolution_arithmetic_-_Padding_strides.gif?20190413174630)

![](https://miro.medium.com/v2/resize:fit:640/1*ZCjPUFrB6eHPRi4eyP6aaA.gif)

Nous allons créer notre block (ou couche) de base comprenant :
- une convolution 3 x 3
- une activation : Mish
- une couche de normalisation : GroupNorm


In [None]:
class ConvBlock(nn.Sequential):
    def __init__(
        self,
        in_channels: int,
        out_channels: int,
        group_norm: int,
    ) -> None:
        super().__init__(
            nn.Conv2d(
                in_channels,
                out_channels,
                kernel_size=(3, 3),
                padding=(1, 1),
                stride=(1, 1),
            ),
            nn.Mish(),
            nn.GroupNorm(group_norm, out_channels),
        )

In [None]:
block_1 = ConvBlock(3, 8, 4)

In [None]:
x_30.size()

In [None]:
x_30 = x_30.unsqueeze(0)
out = block_1(x_30)

In [None]:
out.size()

### Up sample / down sample

Pour diminuer la taille de nos images latentes (celles entre les couches de l'encodeur) : on applique un pas de 2 pour nos convolutions.

Pour augmenter la taille des images latentes (celles entre les couches du décodeur) : des convolutions transposées à pas de 2.

![](https://miro.medium.com/v2/resize:fit:720/1*kOThnLR8Fge_AJcHrkR3dg.gif)

In [None]:
class StrideConvBlock(nn.Sequential):
    def __init__(
        self,
        in_channels: int,
        out_channels: int,
        norm_groups: int,
        mode: Literal["up", "down"],
    ) -> None:
        conv_constructors = {
            "up": nn.ConvTranspose2d,
            "down": nn.Conv2d,
        }

        super().__init__(
            conv_constructors[mode](
                in_channels,
                out_channels,
                kernel_size=(4, 4),
                padding=(1, 1),
                stride=(2, 2)
            ),
            nn.Mish(),
            nn.GroupNorm(norm_groups, out_channels),
        )

In [None]:
down_block = StrideConvBlock(8, 16, 4, "down")

In [None]:
out_2 = down_block(out)

In [None]:
out.size(), out_2.size()

In [None]:
up_block = StrideConvBlock(16, 8, 4, "up")

In [None]:
out_3 = up_block(out_2)

In [None]:
out_2.size(), out_3.size()

### Time embedding

Nous disposons maintenant des briques de base pour notre U-Net. Il ne manque plus qu'à intégrer le paramètre supplémentaire représentant l'indice de l'étape dans le processus de diffusion.