<a href="https://colab.research.google.com/github/MihaiDogariu/CV3/blob/main/laborator/CV%203%20-%20Lab%20%239.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Variational Autoencoder
În acest laborator vom studia modul de funcționare al autoencoder-ului variațional și vom implementa câteva cazuri particulare ale acestuia.

Structura unui autoencoder variațional este cea din Figura 1.
<div>
  <center>
    <img src="https://drive.google.com/uc?export=view&id=19xaohSDC69CnfsICxrGF5dLaMbZk3aPI" width="400" class="center">
    <p>Figura 1. Structura generală a unui autoencoder variațional.</p>
  </center>
</div>

Autoencoder-ul variațional este un caz particular de autoencoder, specializat în generarea de eșantioane noi. El este format din 2 subansamble, asemenea autoencoder-ului:
- encoder - transformă imaginea de intrare într-un descriptor latent: $h=f(x)$
- decoder - transformă descriptorul latent într-o imagine: $r=g(h)=g(f(x))$. Ideal, $g(x) = f^{-1}(x)$, astfel încât $r=x$.

Spre deosebire de autoencoder, obținerea $h=f(x)$ nu se face direct din parcurgerea rețelei neuronale, ci este necesar un pas suplimentar de eșantionare. Fiecare valoare $h$ se obține ca un eșantion extras din distribuția de probabilitate $\mathcal{N}(\mu, \sigma^2)$, unde $\mu$ reprezintă media distribuției, iar $\sigma$ deviația standard, ambele fiind entități ce pot fi învățate de către VAE. Problema apare în momentul în care se extrage un eșantion aleator din această distribuție, deoarece acesta este un proces nedeterminist și nu poate fi învățat (nu poate fi calculat și propagat gradientul pentru o operație aleatoare). Prin urmare, se apelează la o tehnică de reparametrizare, care mută componenta aleatoare în afara rețelei, ca în Figura 2.

<div>
  <center>
    <img src="https://drive.google.com/uc?export=view&id=1WaM5AN4l21xHikF3HqStoS7RKUjsmuod" width="500" class="center">
    <p>Figura 2. Structura generală a unui autoencoder variațional.</p>
  </center>
</div>

În acest fel, $h$ se obține tot ca un eșantion extras din distribuția de probabilitate $\mathcal{N}(\mu, \sigma^2)$, însă acesta este reparametrizat în $\mu+\sigma\times\varepsilon$, cu $\varepsilon\sim\mathcal{N}(0, I)$.

Antrenarea VAE se face minimizând funcția de cost:

$\mathcal{L}=Loss_{reconstrucție}+Loss_{similaritate}$, unde

$Loss_{reconstrucție}=\mathbb{E}_{q_\phi}[\log{p_\theta(x|z))}]$

$Loss_{similaritate}=-D_{KL}(q_\phi(z)||p(z))$

#TODO:
1. Rulați antrenarea autoencoderului variațional cu o dimensiune mai mare a spațiului latent. O capacitate mai mare a rețelei duce, teoretic, la exemple generate mai clare.
1. Rulați antrenarea autoencoderului variațional cu un număr mai mare de epoci și observați care este punctul în care imaginile încep să devină lizibile.
1. Completați codul aferent evaluării modelului.
1. Modificați codul astfel încât să obțineți o tranziție mai lentă între eșantioanele interpolate.
1. Modificați codul astfel încât să obțineți mai puține imagini în spațiul latent 2D.

In [None]:
import sys

%matplotlib inline
import os

import torch
import torch.nn as nn
import torch.nn.functional as F

# Setarea hiperparametrilor

In [None]:
latent_dims = 2
# latent_dims = 10
num_epochs = 10
batch_size = 128
capacity = 64
learning_rate = 1e-3
variational_beta = 1

#Descărcarea și pregătirea bazei de date MNIST Digits

In [None]:
# Descarcarea bazei de date MNIST Digits
import torchvision.transforms as transforms
from torch.utils.data import DataLoader
from torchvision.datasets import MNIST

img_transform = transforms.Compose([
    transforms.ToTensor()
])

train_dataset = MNIST(root='./data/MNIST', download=True, train=True, transform=img_transform)
train_dataloader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)

test_dataset = MNIST(root='./data/MNIST', download=True, train=False, transform=img_transform)
test_dataloader = DataLoader(test_dataset, batch_size=batch_size, shuffle=True)

# Definirea modelului VAE

In [None]:
class Encoder(nn.Module):
    def __init__(self):
        super(Encoder, self).__init__()
        c = capacity
        self.conv1 = nn.Conv2d(in_channels=1, out_channels=c, kernel_size=4, stride=2, padding=1) # out: c x 14 x 14
        self.conv2 = nn.Conv2d(in_channels=c, out_channels=c*2, kernel_size=4, stride=2, padding=1) # out: c x 7 x 7
        self.fc_mu = nn.Linear(in_features=c*2*7*7, out_features=latent_dims) # mu reprezinta vectorul mediilor
        self.fc_logvar = nn.Linear(in_features=c*2*7*7, out_features=latent_dims) # logvar reprezinta vectorul 
            
    def forward(self, x):
        x = F.relu(self.conv1(x))
        x = F.relu(self.conv2(x))
        x = x.view(x.size(0), -1) # flatten
        x_mu = self.fc_mu(x)
        x_logvar = self.fc_logvar(x)
        return x_mu, x_logvar

class Decoder(nn.Module):
    def __init__(self):
        super(Decoder, self).__init__()
        c = capacity
        self.fc = nn.Linear(in_features=latent_dims, out_features=c*2*7*7)
        self.conv2 = nn.ConvTranspose2d(in_channels=c*2, out_channels=c, kernel_size=4, stride=2, padding=1)
        self.conv1 = nn.ConvTranspose2d(in_channels=c, out_channels=1, kernel_size=4, stride=2, padding=1)
            
    def forward(self, x):
        x = self.fc(x)
        x = x.view(x.size(0), capacity*2, 7, 7) # unflatten
        x = F.relu(self.conv2(x))
        x = torch.sigmoid(self.conv1(x)) # se foloseste sigmoid datorita functiei de reconstructie BCE
        return x
    
class VariationalAutoencoder(nn.Module):
    def __init__(self):
        super(VariationalAutoencoder, self).__init__()
        self.encoder = Encoder()
        self.decoder = Decoder()
    
    def forward(self, x):
        latent_mu, latent_logvar = self.encoder(x)
        latent = self.latent_sample(latent_mu, latent_logvar)
        x_reconstruit = self.decoder(latent)
        return x_reconstruit, latent_mu, latent_logvar
    
    def latent_sample(self, mu, logvar):
        if self.training:
            # reparametrizarea
            std = logvar.mul(0.5).exp_()
            eps = torch.empty_like(std).normal_()
            return eps.mul(std).add_(mu)
        else:
            return mu
    
def vae_loss(x_reconstruit, x, mu, logvar):
    loss_reconstructie = F.binary_cross_entropy(x_reconstruit.view(-1, 784), x.view(-1, 784), reduction='sum')
    loss_similaritate = -0.5 * torch.sum(1 + logvar - mu.pow(2) - logvar.exp()) # divergenta Kullback-Leibler
    return loss_reconstructie + variational_beta * loss_similaritate # variational_beta este un parametru care controleaza aportul celor 2 componente la functia de pierdere finala

In [None]:
vae = VariationalAutoencoder()

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
vae = vae.to(device)

# Antrenarea VAE

In [None]:
optimizer = torch.optim.Adam(params=vae.parameters(), lr=learning_rate, weight_decay=1e-5)

# set to training mode
vae.train()

train_loss_avg = []

print('Training ...')
for epoch in range(num_epochs):
    train_loss_avg.append(0)
    num_batches = 0
    
    for image_batch, _ in train_dataloader:
        
        image_batch = image_batch.to(device)

        # reconstructia x -> codor -> h -> decodor -> x_reconstruit
        image_batch_reconstruit, latent_mu, latent_logvar = vae(image_batch)
        
        # eroarea variationala (Evidence Lower Bound - ELBO)
        loss = vae_loss(image_batch_reconstruit, image_batch, latent_mu, latent_logvar)
        
        # backpropagation
        optimizer.zero_grad()
        loss.backward()
        
        # actualizarea ponderilor retelei
        optimizer.step()
        
        # acumularea erorii la fiecare iteratie
        train_loss_avg[-1] += loss.item()
        num_batches += 1
        
    # medierea erorii pe intreaga epoca
    train_loss_avg[-1] /= num_batches
    print('Epoch [%d / %d] average reconstruction error: %f' % (epoch+1, num_epochs, train_loss_avg[-1]))

# Afișarea curbei erorii de antrenare

In [None]:
import matplotlib.pyplot as plt
plt.ion()

fig = plt.figure()
plt.plot(train_loss_avg)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.show()

# Evaluarea pe baza de date de test

In [None]:
# Setarea autoencoderului variational in modul de evaluare
vae.eval()

test_loss_avg, num_batches = 0, 0

#TODO
    
test_loss_avg /= num_batches
print('average reconstruction error: %f' % (test_loss_avg))

# Vizualizarea reconstrucției

In [None]:
# Vizualizarea reconstructiei
import numpy as np
import matplotlib.pyplot as plt
plt.ion()

import torchvision.utils

vae.eval()

def to_img(x):
    x = x.clamp(0, 1) # unele valori pot depasi intervalul [0, 1]
    return x

def show_image(img):
    img = to_img(img)
    npimg = img.numpy()
    plt.imshow(np.transpose(npimg, (1, 2, 0)))

def visualise_output(images, model):

    with torch.no_grad():
    
        images = images.to(device)
        images, _, _ = model(images)
        images = images.cpu()
        # images = to_img(images)
        np_imagegrid = torchvision.utils.make_grid(images[1:50], 10, 5).numpy()
        plt.imshow(np.transpose(np_imagegrid, (1, 2, 0)))
        plt.show()

images, labels = next(iter(test_dataloader))

# Afisarae imaginilor originale
print('Imagini originale')
show_image(torchvision.utils.make_grid(images[1:50],10,5))
plt.show()

# Afisarea imaginilor reconstruite de catre VAE
print('Imagini reconstruite de catre VAE')
visualise_output(images, vae)

# Utilizarea VAE pentru generare de eșantioane noi

In [None]:
vae.eval()

with torch.no_grad():

    # esantionare vectorilor latenti din distributia latenta
    latent = torch.randn(128, latent_dims, device=device)

    # reconstruirea imaginilor din vectorii latenti
    img_recon = vae.decoder(latent)
    img_recon = img_recon.cpu()

    fig, ax = plt.subplots(figsize=(5, 5))
    show_image(torchvision.utils.make_grid(img_recon.data[:100],10,5))
    plt.show()

# Interpolare in spațiul latent

In [None]:
vae.eval()

def interpolation(lambda1, model, img1, img2):
    
    with torch.no_grad():
    
        # vectorul latent al primei imagini
        img1 = img1.to(device)
        latent_1, _ = model.encoder(img1)

        # vectorul latent al celei de-a doua imagini
        img2 = img2.to(device)
        latent_2, _ = model.encoder(img2)

        # interpolarea intre cei doi vectori cu factorul lambda - lambda controleaza numarul pasului de interpolare
        inter_latent = lambda1* latent_1 + (1- lambda1) * latent_2

        # reconstructia imaginii interpolate
        inter_image = model.decoder(inter_latent)
        inter_image = inter_image.cpu()

        return inter_image
    
# sortarea imaginilor in functie de cifra pe care o reprezinta
digits = [[] for _ in range(10)]
for img_batch, label_batch in test_dataloader:
    for i in range(img_batch.size(0)):
        digits[label_batch[i]].append(img_batch[i:i+1])
    if sum(len(d) for d in digits) >= 1000:
        break;

# calcularea parametrilor lambda - cel de-al treilea argument ne arata numarul de pasi de interpolare pe care ii vom realiza; un numar mai mare duce la tranzitii mai lente intre imagini
lambda_range=np.linspace(0,1,10)

fig, axs = plt.subplots(2,5, figsize=(15, 6))
fig.subplots_adjust(hspace = .5, wspace=.001)
axs = axs.ravel()

for ind,l in enumerate(lambda_range):
    inter_image=interpolation(float(l), vae, digits[7][0], digits[1][0])
   
    inter_image = to_img(inter_image)
    
    image = inter_image.numpy()
   
    axs[ind].imshow(image[0,0,:,:], cmap='gray')
    axs[ind].set_title('lambda_val='+str(round(l,1)))
plt.show() 

# Afișarea spațiului latent 2D

In [None]:
# Acest subpunct are sens doar in cazul in care dimensiunea vectorului latent este 2
if latent_dims != 2:
    print('Please change the parameters to two latent dimensions.')
    
with torch.no_grad():
    
    # crearea unui spatiu latent 2D
    latent_x = np.linspace(-1.5,1.5,20)
    latent_y = np.linspace(-1.5,1.5,20)
    latents = torch.FloatTensor(len(latent_y), len(latent_x), 2)
    for i, lx in enumerate(latent_x):
        for j, ly in enumerate(latent_y):
            latents[j, i, 0] = lx
            latents[j, i, 1] = ly
    latents = latents.view(-1, 2) # flatten in batch

    # reconstructia imaginilor din vectorii latenti
    latents = latents.to(device)
    image_recon = vae.decoder(latents)
    image_recon = image_recon.cpu()

    fig, ax = plt.subplots(figsize=(10, 10))
    show_image(torchvision.utils.make_grid(image_recon.data[:400],20,5))
    plt.show()