___
## <span style='color:#0d5874'> Travaux pratiques : introduction aux modèles génératifs </span>
___

<span style='color:Red'>**L’objectif de cette séance de TP est de réaliser un bref tour d’horizon des modèles génératifs auto-encodeur et des génératifs**</span>

---
<span style='color:#0d5874'> **Exercice 1 : Autoencodeurs et génération d’images** </span>

---
Cet exercice présente l’utilisation des autoencodeurs avec PyTorch. Nous allons utiliser un modèle simple, entièrement connecté, permettant de compresser une image en une représentation vectorielle. Cet exercice utilise MNIST comme jeu de données vues précédemment dans le TP1.

![autoencoder.png](autoencoder.png)

In [2]:
# Imports liés à torch
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F

# Matplotlib
import matplotlib.pyplot as plt

device = 'cuda' if torch.cuda.is_available() else 'cpu'

In [3]:
from torchvision import datasets, transforms # On peut inporter directement la dataset de pytorch
from torch.utils.data import DataLoader 
BATCH_SIZE = 128
transform = transforms.Compose([transforms.ToTensor(),  transforms.Normalize((0.5,), (0.5,))])

# On télécharge et on créer la dataset d'entraienement à l'aide du module datasets de torchvision
train_dataset = datasets.MNIST(root='mnist_data', 
                               train=True, 
                               transform=transform,
                               download=True)

# On télécharge et on créer la dataset de test à l'aide du module datasets de torchvision
valid_dataset = datasets.MNIST(root='mnist_data', 
                               train=False, 
                               transform=transform)

# On définit le data loaders d'entraienement . Le data loaders permet de créer des batchs. On doit lui renseigner le batch size.
train_loader = DataLoader(dataset=train_dataset, 
                          batch_size=BATCH_SIZE, 
                          shuffle=True)

# On définit le data loaders de validation . 
valid_loader = DataLoader(dataset=valid_dataset, 
                          batch_size=BATCH_SIZE, 
                          shuffle=False)




---
<span style='color:Green'>**Question**</span>

En utilisant l’interface [nn.Sequential](https://pytorch.org/docs/stable/generated/torch.nn.Sequential.html) de PyTorch, écrire le code qui définit un modèle autoencodeur entièrement connecté.

Ce modèle prend en entrée une image de dimensions 28×28 sous forme d’un vecteur aplati de longeur 784. On définira une variable latent_dimension qui permet de contrôler la taille du code z en sortie de l’encodeur.

Le décodeur devra prendre en entrée un code z de longueur latent_dimension et produire un vecteur aplati de longueur 28×28=784 (identique à l’image). On choisira une valeur raisonnable pour la dimension du code (par exemple, entre 30 et 250).

encoder : 

1. Linear(in_features=784, out_features=1024, bias=True) ReLU()
2. Linear(in_features=1024, out_features=256, bias=True) ReLU()
3. Linear(in_features=256, out_features=128, bias=True)

decoder:

1. Linear(in_features=128, out_features=256, bias=True) ReLU()
2. Linear(in_features=256, out_features=1024, bias=True) ReLU()
3. Linear(in_features=1024, out_features=784, bias=True) Sigmoid()

---

**Indice**
Indice: l’utilisation de la méthode [.view()](https://pytorch.org/docs/stable/generated/torch.Tensor.view.html) ou de la couche [nn.Flatten()](https://pytorch.org/docs/stable/generated/torch.nn.Flatten.html) peut être utile pour ré-arranger les tenseurs avant ou après les couches linéaires. Par exemple, x.view(-1, 1, 28, 28) permet de transformer un tenseur de dimensions 784 en un tenseur de dimensions (batch, 1, 28, 28)…

In [4]:
class Auto_encoder(nn.Module):
    
    def __init__(self,latent_dimension): # On définit 
        super(Auto_encoder, self).__init__()
        
        self.encoder = nn.Sequential(
         ###### Votre code ici ########
        )

        self.decoder = nn.Sequential(
        
            ###### Votre code ici ########
        )
    
    def forward(self, x): # on défini le passage de nos données
            x = torch.flatten(x,1)
            z = self.encoder(x)
            x_hat = self.decoder(z) 
            x_hat = x_hat.view(-1,1,28,28)
            return x_hat
    
auto_encoder = Auto_encoder(latent_dimension = 128)
print(auto_encoder)

Auto_encoder(
  (encoder): Sequential(
    (0): Linear(in_features=784, out_features=1024, bias=True)
    (1): ReLU()
    (2): Linear(in_features=1024, out_features=256, bias=True)
    (3): ReLU()
    (4): Linear(in_features=256, out_features=128, bias=True)
  )
  (decoder): Sequential(
    (0): Linear(in_features=128, out_features=256, bias=True)
    (1): ReLU()
    (2): Linear(in_features=256, out_features=1024, bias=True)
    (3): ReLU()
    (4): Linear(in_features=1024, out_features=784, bias=True)
    (5): Sigmoid()
  )
)


---
<span style='color:Green'>**Question**</span>

---
Initialisez la loss; Comme paramétre de criterion on prendra la L1Loss et l'optimizer Adam avec lr=0.0001.

In [5]:
criterion = nn.L1Loss()
optimizer = torch.optim.Adam(auto_encoder.parameters(),lr=0.0001)

---
<span style='color:Green'>**Question**</span>

---
Créer une fonction validate qui permettra d'évaluer notre modèle elle prendra en paramètre : 
def validate(valid_loader, model, criterion):

In [6]:
def validate(valid_loader, model, criterion):
    '''
    Function for the validation step of the training loop
    '''
   
    model.eval()
    running_loss = 0
    
    for X, y_true in valid_loader:
        ###### Votre code ici ########
        
    running_loss /= len(valid_loader)
    return running_loss

---
<span style='color:Green'>**Question**</span>

---
Créer une fonction Train qui permettra d'entrainer notre modèle elle prendra en paramètre : 
def train(model, train_loader, optimizer, criterion, device, epochs=100): ainsi que la fonction validate précedement implémenté.

--- 
**Note**
Utiliser avant la fontion validate la fontion [.no_grad](https://pytorch.org/docs/stable/generated/torch.no_grad.html) pour désactiver le gradient.

---

In [7]:
def train(model, train_loader, optimizer, criterion, device, epochs=25):
    model = model.to(device)
    model.train()
    
    print('Start Training')
    for epoch in range(epochs):  # loop over the dataset multiple times

        running_loss = 0.0
        for i, (X,y_true) in enumerate(train_loader):
            ###### Votre code ici ########
    
        running_loss /= len(train_loader)
        
        with torch.no_grad():
            running_loss_validate = validate(valid_loader, model, criterion)
            
        print(f" ecpochs: {epoch}, loss: {running_loss:.4f}, loss_validate:{running_loss_validate:.4f}")
            
    print('Finished Training')

In [7]:
train(auto_encoder,train_loader,optimizer,criterion,device)

Start Training
 ecpochs: 0, loss: 0.9629, loss_validate:0.9439
 ecpochs: 1, loss: 0.9378, loss_validate:0.9294
 ecpochs: 2, loss: 0.9215, loss_validate:0.9121
 ecpochs: 3, loss: 0.9103, loss_validate:0.9040
 ecpochs: 4, loss: 0.9027, loss_validate:0.8968
 ecpochs: 5, loss: 0.8966, loss_validate:0.8918
 ecpochs: 6, loss: 0.8932, loss_validate:0.8890
 ecpochs: 7, loss: 0.8903, loss_validate:0.8863
 ecpochs: 8, loss: 0.8879, loss_validate:0.8842
 ecpochs: 9, loss: 0.8857, loss_validate:0.8822
 ecpochs: 10, loss: 0.8840, loss_validate:0.8809
 ecpochs: 11, loss: 0.8827, loss_validate:0.8798
 ecpochs: 12, loss: 0.8816, loss_validate:0.8787
 ecpochs: 13, loss: 0.8805, loss_validate:0.8777
 ecpochs: 14, loss: 0.8796, loss_validate:0.8769
 ecpochs: 15, loss: 0.8787, loss_validate:0.8760
 ecpochs: 16, loss: 0.8779, loss_validate:0.8752
 ecpochs: 17, loss: 0.8771, loss_validate:0.8746
 ecpochs: 18, loss: 0.8765, loss_validate:0.8740
 ecpochs: 19, loss: 0.8760, loss_validate:0.8735
 ecpochs: 20, l

**Visualiser les images produites par l'auto-encoder avec la fonction suivante:**

In [None]:
from torchvision.utils import make_grid

net = auto_encoder.eval()
test_dataloader = DataLoader(valid_dataset, batch_size=128, shuffle=False)


def show_grid(grid):
    plt.imshow(np.transpose(grid.numpy(), (1, 2, 0)))
    plt.show()

def visualize_reconstructions(net, images, device=device):
    # Mode inférence
    with torch.no_grad():
        images = images.to(device)
        reconstructions = net(images)[0]
        image_grid = make_grid(reconstructions[1:50], 10, 5).cpu()
        return image_grid

images, _ = next(iter(test_dataloader))

# Images de test
plt.figure(figsize=(12, 6))
plt.title("Images du jeu de test")
show_grid(make_grid(images[1:50],10,5))

# Reconstruction et visualisation des images reconstruites
plt.figure(figsize=(12, 6))
plt.title("Reconstruction par l'auto-encodeur")
show_grid(visualize_reconstructions(vae_encoder, images))

---
<span style='color:#0d5874'> **Exercice 2 : Autoencodeurs convolutif et génération d’images** </span>

---

Implémentation
Notre modèle sera un auto-encodeur convolutif doté de l’architecture ci-dessous.

Pour l’encodeur :

* une couche de convolution (kernel_size=4, in_channels=1, out_channels=32, stride=2, padding=1, activation ReLU)

* une couche de convolution (kernel_size=4, in_channels=32, out_channels=64, stride=2, padding=1, activation ReLU)

* une couche linéaire (in_features=64*7*7, out_features=latent_dimension)

Pour le décodeur:

* une couche linéaire (in_features=latent_dimension, out_features=64*7*7, activation ReLU)

* une couche de convolution transposée (kernel size=4, in_channels=64, out_channels=32, stride=2, padding=1, activation ReLU)

* une couche de convolution transposée (kernel size=4, in_channels=32, out_channels=1, stride=2, padding=1, activation sigmoide)

--- 
**Note**
Les filtres convolutifs sont choisis de taille 4x4 afin d’éviter des problèmes [d’aliasing](https://distill.pub/2016/deconv-checkerboard/).

<span style='color:Green'>**Question**</span>
---

Compléter l’implémentation ci-dessous de l’auto-encodeur dont l’architecture vient d’être décrite. Cette implémentation utilise l’interface torch.nn.Module dont la documentation peut vous être utile.

En plus de la reconstruction par l’auto-encodeur, on souhaite que la méthode forward() renvoie également le code intermédiaire z (un vecteur de longueur latent_dimension) obtenu après le passage dans le décodeur.

In [None]:
class AutoEncoder_conv(nn.Module):
    def __init__(self, latent_dimension):
        super(AutoEncoder_conv, self).__init__()
        self.encoder = nn.Sequential(nn.Conv2d(1, 32, kernel_size=4, stride=2, padding=1),
                                     nn.BatchNorm2d(32), 
                                     nn.ReLU(),
                                     nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1),
                                     nn.BatchNorm2d(64),
                                     nn.ReLU(),
                                     nn.Flatten(),
                                     nn.Linear(in_features=64*7*7, out_features=latent_dimension)
                                     )
        self.decoder_linear = nn.Linear(in_features=latent_dimension, out_features=64*7*7)
        
        self.decoder = nn.Sequential(nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
                                     nn.BatchNorm2d(32),
                                     nn.ReLU(),
                                     nn.ConvTranspose2d(32, 1, kernel_size=4, stride=2, padding=1),
                                     nn.BatchNorm2d(1),
                                     nn.Sigmoid(),
                                    )

    def forward(self, x):
        z = self.encoder(x)
        hat_x = F.relu(self.decoder_linear(z))
        hat_x = hat_x.view(-1, 64, 7, 7)
        hat_x = self.decoder(hat_x)
        return hat_x
    
autoencoder_conv = AutoEncoder_conv(latent_dimension=10)
print(autoencoder_conv)

In [None]:
criterion = nn.MSELoss()
optimizer = torch.optim.Adam(autoencoder_conv.parameters(),lr=0.0001)

In [None]:
train(autoencoder_conv,train_loader,optimizer,criterion,device)

**Visualiser les images produites par l'auto-encoder et comparer avec les images précédentes**

---

## <span style='color:#0d5874'>  Auto-encodeurs variationnels  </span> ##

---

![vae-gaussian.png](vae-gaussian.png)

Implémentation
Nous allons à présent implémenter un VAE convolutif qui hérite de la même structure que l’auto-encodeur que nous avons précédemment défini. Pour nous simplifier les choses par la suite, nous allons commencer par séparer le sous-réseau qui définit l’encodeur de celui qui définit le décodeur.

<span style='color:Green'>**Question**</span>

en reprenant ce qui a été fait plus haut pour l’auto-encodeur classique, compléter les implémentations ci-dessous de l’encodeur et du décodeur pour le VAE. On rappelle que, contrairement à l’auto-encodeur, la sortie de l’encodeur est double :

+ le vecteur <span style='color:Red'>mu</span> qui contient la moyenne de la gaussienne dans l’espace latent,
+ le vecteur <span style='color:Red'>sigma</span> qui contient les variances selon les différentes directions de la gaussienne dans l’espace latent.


le vecteur sigma qui contient les variances selon les différentes directions de la gaussienne dans l’espace latent.

Ces valeurs seront les paramètres de la gaussienne associée à une observation x. Ces deux vecteurs ont pour dimension la dimension de l’espace latent.

In [None]:
class Encoder(nn.Module):
    def __init__(self, latent_dimension):
        super(Encoder, self).__init__()
        self.model = nn.Sequential(nn.Conv2d(1, 32, kernel_size=4, stride=2, padding=1),
                                     nn.ReLU(),
                                     nn.Conv2d(32, 64, kernel_size=4, stride=2, padding=1),
                                     nn.ReLU(),
                                     nn.Flatten(),
                                   )
        self.linear1 = nn.Linear(in_features=64*7*7, out_features=latent_dimension)
        self.linear2 = nn.Linear(in_features=64*7*7, out_features=latent_dimension)

    def forward(self, x):
        x = self.model(x)
        x_mu = self.linear1(x)
        x_logvar = self.linear2(x)
        return x_mu, x_logvar

class Decoder(nn.Module):
    def __init__(self, latent_dimension):
        super(Decoder, self).__init__()
        self.linear = nn.Linear(in_features=latent_dimension, out_features=64*7*7)
        self.model = nn.Sequential(nn.ConvTranspose2d(64, 32, kernel_size=4, stride=2, padding=1),
                                     nn.ReLU(),
                                     nn.ConvTranspose2d(32, 1, kernel_size=4, stride=2, padding=1),
                                     nn.Sigmoid(),
                                    )

    def forward(self, z):
        hat_x = F.relu(self.linear(z))
        hat_x = hat_x.view(-1, 64, 7, 7)
        hat_x = self.model(hat_x)
        return hat_x
p

Nous allons à présent combiner l’encodeur et le décodeur pour former l’auto-encodeur variationnel complet. Il y a néanmoins une petite subtilité car nous devons implémenter l’astuce de reparamétrisation. Celle-ci est implémenter dans la méthode latent_sample

Lors d’un passage avant (forward), le schéma suivant doit se dérouler :

1. L’encodeur prend x en entrée et produit la moyenne mu et la variance logvar de la distribution. En pratique, pour créer mu et logvar deux couches linear en fin du décodeur.

2. On tire un échantillon aléatoire z dans l’espace latent à l’aide de la méthode latent_sample. L’échantillonnage est fait selon la distribution gaussienne latente associée à x grâce à la reparamétrisation. Lors de l’inférence, on ne réalisera pas d’échantillonnage mais on se contentera d’utiliser la moyenne de la gaussienne.

3. L’échantillon aléatoire z est passé dans le décodeur de sorte à obtenir la reconstruction x_recon.

In [None]:
class VariationalAutoencoder(nn.Module):
    def __init__(self, latent_dim):
        super(VariationalAutoencoder, self).__init__()
        self.encoder = Encoder(latent_dim)
        self.decoder = Decoder(latent_dim)

    def forward(self, x):
        latent_mu, latent_logvar = self.encoder(x)
        z = self.latent_sample(latent_mu, latent_logvar)
        hat_x = self.decoder(z)
        return hat_x, latent_mu, latent_logvar

    def latent_sample(self, mu, logvar):
        if self.training:
            # the reparameterization trick
            std = logvar.mul(0.5).exp_()
            eps = torch.empty_like(std).normal_()
            return eps.mul(std).add_(mu)
        else:
            return mu
        
vae_encoder = VariationalAutoencoder(latent_dim = 128)
print(vae_encoder)

Enfin, il reste à définir la fonction de coût du VAE. D’après le cours, on cherche à maximiser $\mathcal{L}$. Ici, on choisira de minimiser $-\mathcal{L}$ avec


$\mathcal{L}(\theta,\phi ;\boldsymbol{x}) =  \underbrace{\mathbb{E}_{q_\phi(\boldsymbol z | \boldsymbol x)} \left [ \log p_\theta(\boldsymbol x | \boldsymbol z) \right ]}_{\text{Esperance de la vraisemblance}} - \underbrace{KL\, \left (q_\phi(\boldsymbol z | \boldsymbol x) \, || \, p_\theta(\boldsymbol z)\right)}_{\text{ecart au prior}}$

La fonction de coût pour une reconstruction sur une seule donnée $x^{(i)}$ est approximée par :

$\mathcal{L}(\theta,\phi ;\boldsymbol x^{(i)}) \simeq - \frac{1}{2} \sum_j^d \bigl ( 1 + \log((\sigma_j^{(i)})^2) - (\mu_j^{(i)})^2 - (\sigma_j^{(i)})^2  \bigr) - \log p_\theta(\boldsymbol x^{(i)} | \boldsymbol z^{(i)})$

---
**Note**



Dans la plupart des cas, la vraisemblance est supposée gaussienne et la fonction de coût évaluant la reconstruction correspondera donc à l’erreur quadratique moyenne (F.mse_loss()). Dans notre cas, la distribution des valeurs des pixels de Fashion-MNIST est plutôt bimodale. Les images étant à valeurs entre 0 et 1, il est possible d’utiliser une entropie croisée binaire (F.bce_loss()) et c’est cette version qui donne les meilleurs résultats.

---
Le prior $p_{θ}(z)$ est supposé être donné par une loi normale centrée réduite. La divergence de Kullback-Leibler est alors donnée par:

$KL(q_\phi(\boldsymbol z | \boldsymbol x) || p_\theta(\boldsymbol z)) = \frac{1}{2} \bigl( \text{tr}(\boldsymbol \sigma \boldsymbol I) + \boldsymbol \mu^T \boldsymbol \mu - k - \log \text{det}(\boldsymbol \sigma \boldsymbol I)\big)$

In [None]:
def vae_loss(hat_x, x, mu, logvar):
    reconstruction_loss = F.binary_cross_entropy(hat_x.view(-1, 28*28), x.view(-1, 28*28), reduction='sum')
    kl_divergence = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp())
    return reconstruction_loss + kl_divergence

In [None]:
def validate(valid_loader, model, criterion):
    '''
    Function for the validation step of the training loop
    '''
   
    model.eval()
    running_loss = 0
    
    for X, y_true in valid_loader:
    
        X = X.to(device)
        
        # Forward pass and record loss
        X_hat, latent_mu, latent_logvar = model(X)
        loss = vae_loss(X_hat, X, latent_mu, latent_logvar)
        running_loss += loss.item()
        
    running_loss /= len(valid_loader)
    return running_loss

In [None]:
def train(model, train_loader, optimizer, device, epochs=25):
    model = model.to(device)
    model.train()
    
    print('Start Training')
    for epoch in range(epochs):  # loop over the dataset multiple times

        running_loss = 0.0
        for i, (X,y_true) in enumerate(train_loader):
            
            X = X.to(device)

            # zero the parameter gradients
            optimizer.zero_grad()

            # forward + backward + optimize
            X_hat, latent_mu, latent_logvar = model(X)
            loss = vae_loss(X_hat, X, latent_mu, latent_logvar)
            loss.backward()
            optimizer.step()

            # print statistics
            running_loss += loss.item()
    
        running_loss /= len(train_loader)
        
        with torch.no_grad():
            running_loss_validate = validate(valid_loader, model, criterion)
            
        print(f" ecpochs: {epoch}, loss: {running_loss:.4f}, loss_validate:{running_loss_validate:.4f}")
            
    print('Finished Training')

In [None]:
criterion = vae_loss
optimizer = torch.optim.Adam(vae_encoder.parameters(),lr=0.0001)
train(vae_encoder,train_loader,optimizer,device)

**Visualiser les images produites par le VAE et comparer avec les images précédentes**