In [1]:
import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import torch
from sklearn.datasets import load_digits
from torchvision.datasets import FashionMNIST
from torch.utils.data import Subset, DataLoader
from torchvision import datasets
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions.multivariate_normal import MultivariateNormal
import math
from torchvision import transforms
from torch.linalg import multi_dot
import gc

# Init



In [2]:

# Controlla la disponibilità della GPU
if torch.cuda.is_available():
    device = torch.device("cuda")  # Imposta il dispositivo sulla GPU
else:
    device = torch.device("cpu")  # Se la GPU non è disponibile, utilizza la CPU



In [3]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [4]:
#ink originale: https://drive.google.com/file/d/0B7EVK8r0v71pZjFTYXZWM3FlRnM/view?usp=drive_link&resourcekey=0-dYn9z10tMJOBAkviAcfdyQ
#spostatelo nel vostro drive e copiatelo in locale (perchè il dataloader carica più velocemente le immagini leggendo da qui che dal vostro drive)
!cp '/content/drive/MyDrive/Generative_AI/datasets/celebA/img_align_celeba.zip' celebA.zip

In [5]:
import zipfile

with zipfile.ZipFile("celebA.zip", 'r') as zip_ref:
    zip_ref.extractall('/content/dataset')

# Priors

In [6]:

#-------------------------------------- Mixture of Gaussians (MoG)
class MoG(nn.Module):
  def __init__(self, latent_dimension, num_components=1,):
    super(MoG, self).__init__()

    self.num_components = num_components
    self.latent_dimension = latent_dimension

    #inizializzo dei tensori "da imparare" che sono le medie delle K componenti
    # di dimensione (Num_components, latent_dimension)
    self.means = nn.Parameter(torch.randn((num_components, latent_dimension),dtype=torch.float32))

    #inizializzo K matrici di covarianza diagonali di ognuna delle K componenti
    #li tratto come log_var cosi che se anche fossero negativi, una volta fatto l'exp e quindi convertiti in var diventano positivi
    self.log_vars = nn.Parameter(torch.randn((num_components,latent_dimension),dtype=torch.float32))

    #inizializzo i pesi di ogni gaussiana e li normalizzo affinchè la somma faccia 1
    self.weights = nn.Parameter(torch.ones(num_components)/num_components )


  def sample(self):

    #campiono una camponente in base ai pesi
    component_index =  torch.multinomial(F.softmax(self.weights, dim=0), 1)

    #scelgo la media e la matrice di covarianza della componente scelta
    mean = self.means[component_index]
    log_var = self.log_vars[component_index]

    #creo la matrice di covarianza a partire dai vettori che ne definiscono le diagonali
    cov_matrix = torch.diag_embed(torch.exp(log_var))

    #creo la multivariance
    m = MultivariateNormal(mean,cov_matrix)

    #campiono lo z
    z_sample = m.sample().to(device)

    return z_sample.to(torch.float32)



  def log_prob(self,z_samples):

    #creo le matrici di covarianza a partire dai vettori che ne definiscono le diagonali
    cov_matrices = torch.diag_embed(torch.exp(self.log_vars))

    #creo K gaussiane mixate
    MoG = MultivariateNormal(self.means.to(torch.float32),cov_matrices.to(torch.float32))

    #reshape da (L, N, latent) in (N, latent)
    z_reshaped = z_samples.view(-1, self.latent_dimension)

    #Calcolo per ogni z le k log_prob (N, k)
    k_log_probs_for_z = MoG.log_prob(z_reshaped.unsqueeze(1).to(torch.float32))

    #Reshape originale (L, N, k)
    k_log_probs_for_z_reshaped = k_log_probs_for_z.view(z_samples.shape[0],z_samples.shape[1], self.num_components).to(torch.float32)

    #normalizzo i pesi affinchè la loro somma faccia 1
    probabilities_weights = F.softmax(self.weights, dim=0)

    #per ciascuno moltiplico le k probabilità per i rispettivi pesi
    weigthed_log_probs = k_log_probs_for_z_reshaped * probabilities_weights

    #sommo tutte le log_probs pesate di ogni z (L, N)
    sum_weigthed_log_probs = weigthed_log_probs.sum(-1)

    return sum_weigthed_log_probs.to(torch.float32)





#---------------------VampPrior: Variational Mixture of Posterior Prior
class VampPrior(nn.Module):
  def __init__(self, input_shape, latent_dimension, possible_pixel_values,encode= None, num_components=1):
    super(VampPrior, self).__init__()

    #sarebbe la dimensione D*D dell'ingresso originale a cui le immagini appartengono
    self.input_shape = input_shape
    self.num_components = num_components
    self.latent_dimension = latent_dimension
    self.encode = encode

    #inizializzo gli pseudo-input
    #creo N pseudo input u (N, sqrt(input_shape), sqrt(input_shape))
    u = torch.rand((num_components, int(math.sqrt(input_shape)), int(math.sqrt(input_shape))))*possible_pixel_values
    #li rendo learnable
    self.u = nn.Parameter(u)

    #inizializzo i pesi di ogni gaussiana e li normalizzo affinchè la somma faccia 1
    self.weights = nn.Parameter(torch.ones(num_components)/num_components )


  def sample(self):

    #campiono una camponente in base ai pesi
    component_index =  torch.multinomial(F.softmax(self.weights, dim=0), 1)

    #do all'encoder gli pseudo-input ottenendo le medie e le log_std
    mean_vectors, log_std_vectors = self.encode(self.u)

    #scelgo la media e la matrice di covarianza della componente scelta
    mean_vector = mean_vectors[component_index]
    log_std_vector = log_std_vectors[component_index]

    #creo la matrice di covarianza a partire dai vettori che ne definiscono le diagonali
    cov_matrix = torch.diag_embed(torch.exp(log_std_vector))

    #creo la multivariance
    m = MultivariateNormal(mean_vector, cov_matrix)

    #campiono lo z
    z_sample = m.sample().to(device)

    return z_sample



  def log_prob(self,z_samples):

    #do all'encoder gli pseudo-input ottenendo le medie e le log_std

    mean_vectors, log_std_vectors = self.encode(self.u)

    #creo le matrici di covarianza a partire dai vettori che ne definiscono le diagonali
    cov_matrices = torch.diag_embed(torch.exp(log_std_vectors))

    #creo K gaussiane mixate
    MoG = MultivariateNormal(mean_vectors,cov_matrices)

    #reshape da (L, N, latent) in (N, latent)
    z_reshaped = z_samples.view(-1, self.latent_dimension)

    #Calcolo per ogni z le k log_prob (N, k)
    k_log_probs_for_z = MoG.log_prob(z_reshaped.unsqueeze(1))

    #Reshape originale (L, N, k)
    k_log_probs_for_z_reshaped = k_log_probs_for_z.view(z_samples.shape[0],z_samples.shape[1], self.num_components)

    #normalizzo i pesi affinchè la loro somma faccia 1
    probabilities_weights = F.softmax(self.weights, dim=0)

    #per ciascuno moltiplico le k probabilità per i rispettivi pesi
    weigthed_log_probs = k_log_probs_for_z_reshaped * probabilities_weights

    #sommo tutte le log_probs pesate di ogni z (L, N)
    sum_weigthed_log_probs = weigthed_log_probs.sum(-1)

    return sum_weigthed_log_probs






#----------------------- GTM-VampPrior:  Generative Topographic Mapping and Variational Mixture of Posterior Prior
class GTM_VampPrior(nn.Module):
  def __init__(self, input_shape, latent_dimension, possible_pixel_values,encode= None, num_components=1, u_dim=10):
    super(GTM_VampPrior, self).__init__()

    #sarebbe la dimensione D*D dell'ingresso originale a cui le immagini appartengono
    self.input_shape = input_shape
    self.num_components = num_components
    self.latent_dimension = latent_dimension
    self.encode = encode

    #creo la rete che implementerà la funzione g che opera sui pseudo inputs
    self.g_net = nn.Sequential(nn.Linear(u_dim*u_dim,number_of_hidden_neurons*2),
                                 nn.BatchNorm1d(number_of_hidden_neurons*2),
                                 nn.LeakyReLU(),
                                 nn.Linear(number_of_hidden_neurons*2,number_of_hidden_neurons),
                                 nn.BatchNorm1d(number_of_hidden_neurons),
                                 nn.Tanh(),
                                 #moltiplico per 2 perchè voglio sia il vettore di media che std (diagonale)
                                 nn.Linear(number_of_hidden_neurons,input_shape),
                                 nn.Sigmoid()
                                 )

    #inizializzo gli pseudo-input
    #creo N pseudo input u (N, 10,10)
    u = torch.rand((num_components, u_dim,u_dim))
    #li rendo learnable
    self.u = nn.Parameter(u)

    #inizializzo i pesi di ogni gaussiana e li normalizzo affinchè la somma faccia 1
    self.weights = nn.Parameter(torch.ones(num_components)/num_components )


  def sample(self):

    #campiono una camponente in base ai pesi
    component_index =  torch.multinomial(F.softmax(self.weights, dim=0), 1)

    #do all'encoder gli pseudo-input ottenendo le medie e le log_std
    #processo gli pseudo-input con una funzion g
    x = torch.flatten(self.u,1)
    pseudo_input_after_g = self.g_net(x)

    mean_vectors, log_std_vectors = self.encode(pseudo_input_after_g*possible_pixel_values)


    #scelgo la media e la matrice di covarianza della componente scelta
    mean_vector = mean_vectors[component_index]
    log_std_vector = log_std_vectors[component_index]

    #creo la matrice di covarianza a partire dai vettori che ne definiscono le diagonali
    cov_matrix = torch.diag_embed(torch.exp(log_std_vector))

    #creo la multivariance
    m = MultivariateNormal(mean_vector,cov_matrix)

    #campiono lo z
    z_sample = m.sample().to(device)

    return z_sample




  def log_prob(self,z_samples):

    #do all'encoder gli pseudo-input ottenendo le medie e le log_std
    #processo gli pseudo-input con una funzion g
    x = torch.flatten(self.u,1)
    pseudo_input_after_g = self.g_net(x)

    mean_vectors, log_std_vectors = self.encode(pseudo_input_after_g*possible_pixel_values)

    #creo le matrici di covarianza a partire dai vettori che ne definiscono le diagonali
    cov_matrices = torch.diag_embed(torch.exp(log_std_vectors))

    #creo K gaussiane mixate
    MoG = MultivariateNormal(mean_vectors,cov_matrices)

    #reshape da (L, N, latent) in (N, latent)
    z_reshaped = z_samples.view(-1, self.latent_dimension)

    #Calcolo per ogni z le k log_prob (N, k)
    k_log_probs_for_z = MoG.log_prob(z_reshaped.unsqueeze(1))

    #Reshape originale (L, N, k)
    k_log_probs_for_z_reshaped = k_log_probs_for_z.view(z_samples.shape[0],z_samples.shape[1], self.num_components)

    #normalizzo i pesi affinchè la loro somma faccia 1
    probabilities_weights = F.softmax(self.weights, dim=0)

    #per ciascuno moltiplico le k probabilità per i rispettivi pesi
    weigthed_log_probs = k_log_probs_for_z_reshaped * probabilities_weights

    #sommo tutte le log_probs pesate di ogni z (L, N)
    sum_weigthed_log_probs = weigthed_log_probs.sum(-1)

    return sum_weigthed_log_probs





#--------------------Flow-based prior
class Flow_Based_prior(nn.Module):
  def __init__(self, latent_dimension):
    super(Flow_Based_prior, self).__init__()

    #divideremo l'input Z a metà, quindi prenderemo metà delle componenti
    self.input_dimension = latent_dimension //2

    self.number_of_neurons = 128
    self.number_of_flows = 8

    self.scale_net = nn.Sequential(nn.Linear(self.input_dimension,self.number_of_neurons),
                                   nn.ELU(),
                                   nn.Linear(self.number_of_neurons,self.number_of_neurons*2),
                                   nn.ELU(),
                                   nn.Linear(self.number_of_neurons*2,self.number_of_neurons*2),
                                   nn.Tanh(),
                                   nn.Linear(self.number_of_neurons*2,self.input_dimension),
                                   nn.Tanh()
                                   )
    #neo creo 8
    self.scale_nets = torch.nn.ModuleList([self.scale_net for _ in range(self.number_of_flows)])

    self.translation_net = nn.Sequential(nn.Linear(self.input_dimension,self.number_of_neurons),
                                nn.LeakyReLU(),
                                nn.Linear(self.number_of_neurons,self.number_of_neurons*2),
                                nn.ELU(),
                                nn.Linear(self.number_of_neurons*2,self.number_of_neurons*2),
                                nn.Tanh(),
                                nn.Linear(self.number_of_neurons*2,self.input_dimension),
                                )

    #neo creo 8
    self.translation_nets = torch.nn.ModuleList([self.translation_net for _ in range(self.number_of_flows)])

    #la distribuzione iniziale da cui partire, ossia N(0,I)
    self.p0 = MultivariateNormal(torch.zeros(latent_dimension).to(device), torch.eye(latent_dimension).to(device))


  def coupling_layer(self, z, index, forward=True):

    #divido l'input in due parti
    (za,zb) = torch.chunk(z,2,1)

    #inizializzo i due output del coupling layer
    ya = 0,
    yb = 0

    #print(" calcolo s e t con ingresso di dimensione ", za.shape)
    s = self.scale_nets[index](za)
    t = self.translation_nets[index](za)

    ya= za

    if forward == False:
      yb = torch.exp(s)*zb + t
    else:
      yb = (zb-t)*torch.exp(-s)

    return torch.cat((ya,yb), 1), s

  def permute(self, z):
    return z.flip(1)

  def log_prob(self,z):
    '''
      Io voglio calcolare il log(p(z)) e so che questo è calcolabile come:
        log(p(z)) = ln(p0(z0=f^-1(x)) ) - sum(ln(det(J_fi(z_i-1))))
    '''
    #in ingresso ho (L, N, latent), lo converto in (L*N, latent)
    L = z.shape[0]
    N = z.shape[1]
    z = z.view((L*N,z.shape[2]))
    #se ho N z allora ho N log_det_J da memorizzare, uno per ogni z
    log_det_J = z.new_zeros(z.shape[0])

    output = z
    #vado da p(x) a p0
    for flow_i in range(self.number_of_flows):
      output, s = self.coupling_layer(output, flow_i, forward=True)
      output = self.permute(output)
      log_det_J = log_det_J + s.sum(dim=1)

    #adesso ho ottenuto che output=z0=f^-1(x) e ho la somma dei logaritmi dei determinanti
    ln_p_z = self.p0.log_prob(output) - log_det_J

    #ritorno nel formato previsto (L, N)
    return ln_p_z.view((L,N))


  def sample(self):

    #campiono dalla prior
    z0 = self.p0.sample()

    z = z0.unsqueeze(0)

    #procedo da p0 a p(x)
    for flow_i in reversed(range(self.number_of_flows)):
      z = self.permute(z)
      z,_ = self.coupling_layer(z, flow_i, forward=False)

    #ritorno un campione di p(x)
    return z




#Householder flow

In [7]:
class HouseholderFlow(nn.Module):
  def __init__(self, latent_space_dimension, number_of_flows):
    super(HouseholderFlow,self).__init__()

    self.latent_space_dimension = latent_space_dimension
    self.number_of_flows = number_of_flows

    #creo number_of_flows dense layer, ciascuno elaborerà un suo Householder vector
    self.linears = nn.ModuleList([nn.Linear(latent_space_dimension, latent_space_dimension) for i in range(number_of_flows)])


    '''
      Ricordo che dato un vi (Householder vector) la relativa matrice di Householder
      Hi si trova come:
                          H_i = I - 2* vi^T v / v v^T
      Ne troveremo K che poi moltiplicheremo per trovare la matrice di trasformazione:
                              U = H1*H2*...*HK
      Dopodichè trasformeremo z cosi:
                                  z' = U*z
      Ricorda inoltre che ogni vettore di Householder ( a parte v0) viene trovato dando
      a un lineare layer il vettore di Householder precedente in ingresso.

      NB: quello sopra è teorico. Per praticità calcoleremo il flusso calcolando tutto a step,
      ossia prima z1, poi z2, poi z3 e cosi via...
    '''

  #in ingresso prende gli z0 samples e il primo Householder vector v0
  def forward(self, z, v):

    #creo la matrice identità (una volta, tanto è la stessa per ogni flow)
    I = torch.eye(self.latent_space_dimension).to(device)

    for i in np.arange(self.number_of_flows):

      #calcolo per ogni z la matrice di Householder Hi usando v_(i-1)
      #per ogni householder vector vi calcolo calcolo ||vi||^2
      norms = torch.norm(v, dim=-1, keepdim=True)
      dot_product = torch.pow(norms,2)

      #per ogni householder vector vi calcolo l'outer product con se stesso
      outer_product = torch.matmul(v.unsqueeze(2), v.unsqueeze(1))

      #adesso posso calcolare vi^T vi / ||vi||^2
      normalized_outer_product = outer_product / dot_product[:,None]

      #per ogni householder vector vi calcolo la relativa matrice di Householder Hi
      H_is = I-2*normalized_outer_product

      #calcolo trasformazione per il flusso corrente
      z = torch.matmul(H_is.unsqueeze(0),torch.transpose(z.unsqueeze(2),2,3)).squeeze(-1)

      #calcolo nuovo vettore di Householder
      v = self.linears[i](v)

    #ritorno gli z trasformati
    return z



# Model

In [8]:


#--------------- Encoder
class Encoder(nn.Module):
  def __init__(self, input_shape_image, latent_space_dimension, number_of_hidden_neurons, L, number_of_flows):
    super(Encoder,self).__init__()

    #numero di campioni per l'approssimazione Monte-Carlo dell'Expected Value
    self.L = L

    self.input_shape_image = input_shape_image

    self.latent_space_dimension = latent_space_dimension

    out_channels = 14
    kernel_size = 4

    '''
        NEW scompongo l'encoder in 3 parti cosi da poter utilizzare l'output di un layer
        specifico senza fare girare due volte l'encoder
    '''
    self.encoder_0 = nn.Sequential(
                   nn.Conv2d(1,out_channels,kernel_size, stride=2),
                   nn.BatchNorm2d(out_channels),
                   nn.LeakyReLU(),

                   nn.Conv2d(out_channels,2*out_channels,kernel_size, stride=2),
                   nn.BatchNorm2d(2*out_channels),
                   nn.LeakyReLU(),

                   nn.Conv2d(2*out_channels,2*out_channels,kernel_size,stride=2),
                   nn.BatchNorm2d(2*out_channels),
                   nn.LeakyReLU(),

                   nn.Conv2d(2*out_channels,3*out_channels,kernel_size, stride=2),
                   nn.BatchNorm2d(3*out_channels),
                   nn.LeakyReLU(),

                   nn.Flatten()
                   )

    #QUESTO è il nuovo layer che useremo per estrarre il primo Householder vector v cosi
    #da renderlo in funzione dell'input x
    self.encoder_1 = nn.LazyLinear(latent_space_dimension)

    self.encoder_2 = nn.Linear(latent_space_dimension, 2*latent_space_dimension)

    self.prior = None

    #creo il modulo per effettuare l'Householder Flow
    self.householderTransformation = HouseholderFlow(latent_space_dimension, number_of_flows)


  def set_prior(self,prior):
    #associo la prior scelta
    self.prior = prior

  def encode(self,x):
    #trasformo il batch da (64, 28, 28) in (64,1,28,28)
    x = x.unsqueeze(1)
    #do alla rete x e prelevo vettore di media e std (diagonale)
    output_0 = self.encoder_0(x.to(torch.float32))
    output_1 = self.encoder_1(output_0)
    output = self.encoder_2(output_1)

    #divido il risultato in due parti: media e std (diagonale)(logaritmica)
    mean_vector, log_std_vector = torch.chunk(output, 2, dim=1)

    return mean_vector, log_std_vector


  def KL_loss(self,log_std_vector,mean_vector,v0, batch_length):

    L = self.L

    #trasformo i logaritmi delle std in std
    std_vector = torch.exp(log_std_vector)

    #siccome devo avere una matrice positiva definita (cholesky decomposition) devo
    #assicurarmi che i valori nelle diagonali non siano proprio zero
    EPS = 1.e-5
    std_vector = torch.clamp(std_vector, EPS,1. - EPS)

    #trasformo le sequenze di varianze in matrici diagonali (covarianza)
    covariance_matrixes = torch.diag_embed(std_vector)

    #calcolo N distribuzioni  multivariate q(z|x) creata da ognuno degli N x
    #QUESTA E' LA DISTRIBUZIONE q(z0|x) DI PARTENZA
    q_z_x = MultivariateNormal(mean_vector, covariance_matrixes)


    '''
      per ciascuna distribuzione campiono L vettori z0
      Nota però che anche se campiono L vettori da ciascuna, il risultato
      conterrà i primi N vettori z0 campionati, poi i secondi N e cosi via fino
      agli L-esimi. Per esempio i primi due z0_1 e z0_2 sono stati campionati da due
      distribuzioni diverse! Quindi non ho blocchi da L vettori z0 appartenenti
      alla stessa distribuzione!
    '''
    #dimensione (L, num_distribuzioni, dim_latente)
    z_samples = q_z_x.rsample((L,)) #r sta per "reparametrization trick"


    #Devo trasformare gli z0 campionati in zk tramite il flusso di Householder partendo da v0
    #(L, num_distribuzioni, dim_latente)
    zk_samples = self.householderTransformation(z_samples, v0)

    '''
      Per gli z0 (campionati dalla distribuzione di base) calcolo ln(q(z0|x))
    '''
    z_log_probs = q_z_x.log_prob(z_samples)

    '''
      mentre per gli zk calcolo ln(p(zk))
    '''

    ln_p_z = self.prior.log_prob(zk_samples)

    '''
      Ora per ogni immagine x io ho campionato L vettori z0, li ho trasformati in zk e per ciascuno ho
      valutato sia ln(q(z0|x)) che ln(p(zk)). Per ogni immagine io volevo calcolare
      l'expected value approssimandolo (Monte Carlo) come:

                        KL = [Sum(ln(q(z0|x)))/L - Sum(ln(p(zk))/L)

      Per ottenere la prima sommatoria, sommo le colonne di z_log_probs, mentre
      per la seconda sommo le colonne della matrice ln_p_z. Dopodichè, ottenuti
      due vettori, li divido per L e li sottraggo tra cosi da ottenere l'approssimazione
      della KL per ogni immagine x in ingresso
    '''

    KL_per_image = z_log_probs.sum(0)/L - ln_p_z.sum(0)/L

    #NB: adesso devo ritornare gli z_k in quanto il decoder calcolerò RE con gli zk
    return KL_per_image, zk_samples


  def sample(self):
    z_sample = self.prior.sample()
    return z_sample


  #La rete ritorna il vettore di media, std (diagonale) e la z campionata
  def forward(self, x):

    #trasformo il batch da (64, 28, 28) in (64,1,28,28)
    x = x.unsqueeze(1)

    #do alla rete x e prelevo vettore di media e std (diagonale)
    output_0 = self.encoder_0(x.to(torch.float32))
    #v0 è il primo Householder vector (N, latent)
    v0 = self.encoder_1(output_0)
    output = self.encoder_2(v0)

    #divido il risultato in tre parti: media e std (diagonale)(logaritmica)
    mean_vector, log_std_vector = torch.chunk(output, 2, dim=1)

    '''
      Ottengo un KL_error (N,) contenente per ogni immagine il relativo KL_error,
      e poi una matrice zk_samples (L, N, dim_latente) trasformati per Householder
    '''
    KL_per_image, zk_samples = self.KL_loss(log_std_vector,mean_vector,v0, x.shape[0])


    return KL_per_image, zk_samples




#-------- Decoder
class Decoder(nn.Module):
  def __init__(self, input_shape_image,latent_space_dimension, number_of_hidden_neurons, possible_pixel_values, L):
    super(Decoder,self).__init__()

    self.L = L

    self.input_shape_image = input_shape_image

    self.latent_space_dimension = latent_space_dimension

    self.possible_pixel_values = possible_pixel_values

    kernel_size = 3
    out_channels = 14

    self.decoder = nn.Sequential(nn.Linear(latent_space_dimension,4096),
                          nn.Unflatten(2, (int(math.sqrt(4096)),int(math.sqrt(4096)))),
                          nn.Upsample(size=[8,8], mode='bilinear', align_corners=False),
                          nn.Conv2d(in_channels=1, out_channels=3*out_channels, kernel_size=kernel_size),
                          nn.BatchNorm2d(3*out_channels),
                          nn.LeakyReLU(),

                          nn.Upsample(size=[16,16], mode='bilinear', align_corners=False),
                          nn.Conv2d(in_channels=3*out_channels, out_channels=2*out_channels, kernel_size=kernel_size),
                          nn.BatchNorm2d(2*out_channels),
                          nn.LeakyReLU(),

                          nn.Upsample(size=[16,16], mode='bilinear', align_corners=False),
                          nn.Conv2d(in_channels=2*out_channels, out_channels=out_channels, kernel_size=kernel_size),
                          nn.BatchNorm2d(out_channels),
                          nn.LeakyReLU(),

                          nn.Flatten(),
                          nn.LazyLinear(input_shape_image*possible_pixel_values),
                          nn.Unflatten(1,(input_shape_image,possible_pixel_values))

                          )

  def decode_sample(self,z_sample):

    #inietto nel decoder:
    z_sample = z_sample.reshape(1,1,z_sample.shape[1])

    logits = self.decoder(z_sample.to(torch.float32))

    #(1,W*H,possible_pixel_values)
    logits = logits.reshape(1, self.input_shape_image, self.possible_pixel_values )

    probabilities = torch.softmax(logits, dim=-1)
    #non applico la softmax per convertirli in probabilità perchè
    probabilities = probabilities.view(-1, self.possible_pixel_values)

    sample = torch.multinomial(probabilities, num_samples=1)

    x = sample.view(self.input_shape_image)
    return x


  def forward(self, z, x):
    #z è una matrice (L, N, dim_latente), la inietto nel decoder per ottenere le logits
    z=z.reshape(self.L*z.shape[1], 1,z.shape[2])

    logits = self.decoder(z.to(torch.float32))

    logits = logits.reshape(self.L, int(logits.shape[0]//self.L), logits.shape[1]*logits.shape[2])

    '''
      Prima di convertire le logits in probabilità, ciascun vettore del tensore
      contiene i logits di TUTTI i pixel [px1-v=v1,...,px1-v=vk, px2-v=1,....], quindi
      devo prima fare un reshape del genere [[px1-v=v1,...,px1-v=vk], [...]] isolando
      solo le probabilità di ogni pixel

    '''
    #(L, N, numero_pixel, possibili_valori)
    logits = logits.reshape((logits.shape[0],logits.shape[1],self.input_shape_image,self.possible_pixel_values))
    #applico la softmax per convertire le logits in probabilità
    probabilities = torch.softmax(logits,3)

    #correggo (per questioni di stabilità) le probabilità troppo basse
    #Devono stare tra 0+EPS < p < 1-EPS
    EPS = 1.e-5
    probabilities = torch.clamp(probabilities, EPS,1. - EPS)

    '''
      Per ogni z iniettato ho ottenuto delle probabilità. Siccome voglio valutare
      l'expected value seguente:
                              E[ln(p(x|z))]
      e sicome lo voglio approssimare con gli L ln(p(x|z)) ottenuti, ossia:
                              E[ln(p(x|z))] = 1/L*Sum(ln(p(x|z)))
      allora tutti i calcoli seguenti servono solo a poter ottenere per ciascuna
      immagine x tutti i ln(p(x|z)), in particolare:
      1) Per ogni z ho una matrice di dimensione(numero_pixel, probabilità_valori)
         e quindi estraggo la probabilità che ha quella particolare componente xi
         in ingresso.
      2) Alla fine per ogni coppia x e z ho un vettore di probabilità per xi, quindi
         quella di x è calcolabile come:
                              p(x|z)=p(x1|z)*p(x2|z)*...*p(xk|z)
         Se però calcolo il logaritmo, che è quello che voglio posso sommarli:
                              ln(p(x|z))=ln(p(x1|z))+ln(p(x2|z))+...+ln(p(xk|z))
      3) Avendone L li sommo e li divido per L

    '''

    #converto ogni pixel in un vettore one_hot
    x_one_hot = F.one_hot(x.long(), num_classes = self.possible_pixel_values)
    x_one_hot = x_one_hot.reshape(x_one_hot.shape[0],x_one_hot.shape[1]*x_one_hot.shape[2]*x_one_hot.shape[3])
    probabilities = probabilities.reshape(probabilities.shape[0],probabilities.shape[1],probabilities.shape[2]*probabilities.shape[3])
    #li converto in logaritmi
    log_probabilities = torch.log(probabilities)

    selected_log_probabilities = x_one_hot * log_probabilities
    #adesso in un unico vettore ho tutti le ln(p(x_i|z)) per lo z.
    #li sommo (L,N), ossia ogni vettore contiene gli ln(p(x|z)) per gli N x
    ln_p_x_z = selected_log_probabilities.sum(2)


    #ogni colonna contiene quindi gli ln(p(x|z)) per lo stesso x.
    #li sommo e li divido per L ottenenedo il reconstruction error per ogni x
    RE_per_image = ln_p_x_z.sum(0) / self.L

    return RE_per_image









#------------ Variational AutoEncoder
class VAE(nn.Module):
  def __init__(self, possible_pixel_values, input_shape_image, latent_space_dimension, number_of_hidden_neurons,L,number_of_flows, type_of_prior=0, mog_components=1):
    super(VAE, self).__init__()

    self.encoder = Encoder(input_shape_image,latent_space_dimension,number_of_hidden_neurons,L,number_of_flows)
    self.decoder = Decoder(input_shape_image,latent_space_dimension,number_of_hidden_neurons,possible_pixel_values,L)

    prior = None
    if type_of_prior == 1:
      #creo una mixture of gaussian a 15 componenti come prior p(z)
      prior = MoG(latent_space_dimension, mog_components)
    elif type_of_prior == 2:
      prior = VampPrior(input_shape_image, latent_space_dimension, possible_pixel_values, self.encoder.encode, num_components= mog_components )
    elif type_of_prior == 3:
      prior = GTM_VampPrior(input_shape_image, latent_space_dimension, possible_pixel_values, self.encoder.encode, num_components= mog_components )
    elif type_of_prior == 4:
      prior = Flow_Based_prior(latent_space_dimension)
    else:
      #p(z)=N(0,I)
      prior = MultivariateNormal(torch.zeros(self.latent_space_dimension), torch.eye(self.latent_space_dimension))

    self.prior = prior

  def initialize(self):
    #aggiorno il riferimento della prior per l'encoder
    self.encoder.set_prior(self.prior)

  def sample(self):
    z_sample = self.encoder.sample()
    return self.decoder.decode_sample(z_sample)

  def forward(self, x):

    #inietto x nell'encoder per ottenere la KL loss e i vettori z campionati (Monte-Carlo)
    KL_loss_per_image, z_samples_per_image = self.encoder.forward(x)

    #inietto nel decoder x per essere ricostruito attraverso gli stessi campioni z
    #e per ottenere il reconstruction error
    RE_loss_per_image = self.decoder.forward(z_samples_per_image,x)

    #sommo per ottenere una approssimazione del ln(p(x)) per ogni immagine
    ln_p = KL_loss_per_image - RE_loss_per_image

    #calcolo la media del batch
    ln_p_mean = ln_p.mean()

    return ln_p_mean


# MAIN

In [None]:
##################################### GPU + Path ##########################################

# Controlla la disponibilità della GPU
if torch.cuda.is_available():
    device = torch.device("cuda")  # Imposta il dispositivo sulla GPU
else:
    device = torch.device("cpu")  # Se la GPU non è disponibile, utilizza la CPU

print("Device utilizzato:", device)
print("Numero di GPU disponibili:", torch.cuda.device_count())


#path dove salvare il modello migliore e i vari output di ogni epoca valida
path_to_model = "/content/drive/MyDrive/Generative_AI/datasets/celebA/model/model_prior_mog_new_net1_HF.pth"
path_to_output = "/content/drive/MyDrive/Generative_AI/datasets/celebA/output/prior_mog_posterior_new_net_1_HF_"





##################################### Dataloader ##########################################

#per motivi di efficienza, scegliere il rescaling e il massimo valore che ogni pixel può assumere
resize_to = 64
max_pixel_value = 20

input_shape_image = resize_to*resize_to
possible_pixel_values = max_pixel_value+1


def load_data():


    # Definisci le trasformazioni da applicare alle immagini durante il caricamento
    transform = transforms.Compose([
        transforms.Resize( (resize_to, resize_to) ), #rescaling di ogni immagine
        transforms.Grayscale(),  # Trasforma l'immagine in bianco e nero
        transforms.ToTensor(),# Converte l'immagine in un tensore
        transforms.Lambda(lambda x: torch.round(x*(max_pixel_value))), #normalizzo i valori dei pixel e li forzo ad essere interi
        transforms.Lambda(lambda x: x.to(torch.float32))
    ])

    # Crea un oggetto ImageFolder per caricare le immagini dalla cartella specificata e applica le trasformazioni definite
    dataset = datasets.ImageFolder('/content/dataset/', transform=transform)

    # Calcola l'indice per dividere il dataset tra training set e validation set
    split_ratio = 0.8  # Ratio di suddivisione (80% per il training set, 20% per il validation set)
    dataset_size = len(dataset)
    split_index = int(split_ratio * dataset_size)

    # Crea due sottoinsiemi distinti per il training set e il validation set
    train_dataset = Subset(dataset, range(0, split_index))
    val_dataset = Subset(dataset, range(split_index, dataset_size))

    # Crea i DataLoader per il training set e il validation set
    batch_size = 32
    training_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=False)
    validation_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)

    return training_loader, validation_loader










################################### Training + validation #####################################

def train_model_on_given_gpu():

    #definisco la dimensione dello spazio latente
    latent_space_dimension = 64 #deve essere pari se utilizzi il Flow-based
    #nuumero di hidden neurons nell'encoder e decoder
    number_of_hidden_neurons = 64 #la sua radice quadrata deve essere intera altrimenti ci sarà un errore
    #numero componenti gaussiane per il MoG o VampPrior
    mog_components = 20
    #Monte-Carlo approximations
    L = 2
    #number of Householder flows
    number_of_flows = 10

    #---- creazione del modello
    #decide il numero di campioni per l'approssimazione Monte-Carlo dell'expected value
    print("Memoria GPU prima della creazione del modello:", torch.cuda.memory_allocated())

    model =  VAE( possible_pixel_values, input_shape_image, latent_space_dimension, number_of_hidden_neurons, L,number_of_flows,type_of_prior=1, mog_components=mog_components)
    model.to(device)
    model.initialize()
    print("Memoria GPU dopo la creazione del modello:", torch.cuda.memory_allocated())

    # Esegui un passaggio di inoltro dummy per il LazyLinear
    inputs = torch.round(torch.rand((5, resize_to, resize_to),dtype=torch.float32)*max_pixel_value).to(device)
    dummy_output = model(inputs).item()

    print("Numero parametri modello: ",sum(p.numel() for p in model.parameters() if p.requires_grad))

    del inputs
    del dummy_output
    gc.collect()

    model = torch.nn.parallel.DataParallel(model)

    #parametri per il learning
    learning_rate = 1e-3

    parameters_to_optimize = [p for p in model.parameters() if p.requires_grad == True]

    optimizer = torch.optim.Adamax(parameters_to_optimize, lr=learning_rate)


    #------ Funzione per salvare una griglia di campioni decodificati dallo spazio latente ogni volta che la validation è migliore
    def sample_and_save(model, name, input_shape):

      model.eval()

      #voglio campionare 16 immagini e le voglio in una griglia 4x4
      n=4
      number_of_grid_cells = n*n
      #quindi dico al modello di campionarmi 16 immagini
      xs = np.zeros((number_of_grid_cells,input_shape))
      for i in np.arange(number_of_grid_cells):
        generated_sample = model.module.sample().cpu() # il .module serve per andare oltre il wrapping di DataParallel
        #lo stacco dal grafo di computazione
        generated_sample = generated_sample.detach().numpy()
        xs[i,:] = generated_sample


      fig, ax = plt.subplots(n, n)
      for i, ax in enumerate(ax.flatten()):
          plottable_image = np.reshape(xs[i], (int(math.sqrt(input_shape)), int(math.sqrt(input_shape))))
          ax.imshow(plottable_image, cmap='gray')
          ax.axis('off')

      plt.savefig(path_to_output+'epoca_' +str(name)+ '.pdf', bbox_inches='tight')
      plt.close()



    #---- Training e validation
    number_of_epochs = 1000
    #fisso il limite massimo di batch di training e validazione
    max_batch_for_training = 800
    max_batch_for_validation = 170


    #qui salvo il migliore modello, ossia quello che ha la loss sulla validazione migliore
    best_model = model
    best_validation_loss = 1000000

    patience = 0
    max_patience = 30

    training_loader, validation_loader = load_data()

    grd_acc = 1 #significa che prima di backpropagare l'errore accumulerò il gradiente di batch_size*grd_acc (ees. 8*4=32 è come se processassi batch da 32)

    for epoch in range(number_of_epochs):
      model.train()
      print("Epoca "+str(epoch)+" _____________________________________________________________________")

      num_batch = 1
      for batch, _ in training_loader:

        #reshaping di ogni batch da (N, 1, W, H) a (N, W, H)
        batch = batch.squeeze(1)

        batch = batch.to(device)
        #print("             Memoria GPU utilizzata prima loss per batch:",num_batch,"  -> ", round(torch.cuda.memory_allocated()*(1e-9),5)," GB")
        #batch = batch.to(torch.float32)

        loss = model.forward(batch)
        #print("             Memoria GPU utilizzata dopo loss per batch:",num_batch,"  -> ", round(torch.cuda.memory_allocated()*(1e-9),5)," GB")
        del batch
        gc.collect()
        torch.cuda.empty_cache()

        #calcolo le derivate parziali della loss rispetto ogni parametro NB: LA mean() E' PERCHE' UTILIZZO N GPU E CIASCUNA RITORNA LA SUA LOSS
        (loss.mean()/grd_acc).backward(retain_graph=True)
        torch.cuda.empty_cache()
        #print("             Memoria GPU utilizzata dopo backward per batch:",num_batch,"  -> ", round(torch.cuda.memory_allocated()*(1e-9),5)," GB")
        #se ho accumulato il gradiente di un numero sufficiente di batch, allora backpropago
        if ( (num_batch % grd_acc) == 0):
            #adesso ogni parametro ha in .grad il gradiente. Aggiorno il suo valore
            optimizer.step()

            #resetto il .grad di ogni parametro (altrimenti sommo quello attuale al successivo che calcoleremo nell'epoca dopo)
            optimizer.zero_grad()


        print("   Loss batch: ",str(num_batch),": ", loss, "          Memoria GPU utilizzata  -> ", round(torch.cuda.memory_allocated()*(1e-9),4), "GB")

            #se ho superato il numero massimo di batch per il training, esco
        if num_batch >= max_batch_for_training:
            break
        else:
            num_batch = num_batch + 1

      #alla fine di ogni epoca, valuto come si comporta la loss col validation set
      print("   ___________________________")
      model.eval()
      validation_loss = 0
      N = 0

      torch.cuda.empty_cache()

      num_batch = 1
      for batch, _ in validation_loader:

        batch = batch.squeeze(1)
        batch = batch.to(device)
        #batch = batch.to(torch.float32)
        loss_i = model.forward(batch)
        validation_loss = validation_loss + loss_i.mean().item()# NB: .mean() SOLO PERCH' UTILIZZO N GPU E QUINDI VOGLIO LA MEDIA DI OGNI LOSS RITORNATA DA OGNI GPU
        N = N +  1

        del batch
        gc.collect()
        torch.cuda.empty_cache()

        print("   Loss validation batch ",str(num_batch),": ",loss_i)
        #se ho superato il numero massimo di batch per il validation, esco
        if num_batch >= max_batch_for_validation:
            break
        else:
            num_batch = num_batch + 1

        del loss_i

      validation_loss = validation_loss/N
      print("   Loss media validation: ",str(validation_loss))

      #se tale modello ha una loss migliore di quella attualmente migliore..
      if validation_loss < best_validation_loss:
        patience = 0
        best_validation_loss = validation_loss
        print("   la loss risulta essere migliore")
        torch.save(model.state_dict(), path_to_model)
        #campiono e salvo
        sample_and_save(model, epoch, input_shape_image)
      else:
        print("   patience= "+ str(patience+1))
        patience = patience + 1

      if patience > max_patience:
        print("")
        print("Patience massimo superato. Fine del training")
        break

      del validation_loss



if __name__=="__main__":

    train_model_on_given_gpu()

Device utilizzato: cuda
Numero di GPU disponibili: 1
Memoria GPU prima della creazione del modello: 0




Memoria GPU dopo la creazione del modello: 1509376
Numero parametri modello:  236496410
Epoca 0 _____________________________________________________________________
   Loss batch:  1 :  tensor(12859.8545, device='cuda:0', grad_fn=<MeanBackward0>)           Memoria GPU utilizzata  ->  2.9603 GB
   Loss batch:  2 :  tensor(12774.9834, device='cuda:0', grad_fn=<MeanBackward0>)           Memoria GPU utilizzata  ->  2.9572 GB
   Loss batch:  3 :  tensor(13021.9512, device='cuda:0', grad_fn=<MeanBackward0>)           Memoria GPU utilizzata  ->  2.9585 GB
   Loss batch:  4 :  tensor(13120.5312, device='cuda:0', grad_fn=<MeanBackward0>)           Memoria GPU utilizzata  ->  2.9575 GB
   Loss batch:  5 :  tensor(12795.3604, device='cuda:0', grad_fn=<MeanBackward0>)           Memoria GPU utilizzata  ->  2.9583 GB
   Loss batch:  6 :  tensor(12805.7354, device='cuda:0', grad_fn=<MeanBackward0>)           Memoria GPU utilizzata  ->  2.9576 GB
   Loss batch:  7 :  tensor(12952.6719, device='cuda:0

# test

In [89]:

N=2
input_shape_image = 50*50
possible_pixel_values = 2+1
x = torch.round(torch.rand((3,50,50))*(possible_pixel_values-1))
print("Input shape",x.shape)
latent_space_dimension = 6
number_of_hidden_neurons = 6
L=2
mog_components = 3


model =  VAE( possible_pixel_values, input_shape_image, latent_space_dimension, number_of_hidden_neurons, L,number_of_flows=10,type_of_prior=1, mog_components=mog_components)
model.initialize()

model.forward(x)

Input shape torch.Size([3, 50, 50])


tensor(2880.0144, grad_fn=<MeanBackward0>)

In [65]:
z=torch.tensor([ 1.6128, -0.7670,  0.7999,  0.1326, -0.3292])
m = torch.tensor([[ 0.9719, -0.0809,  0.0633, -0.0105,  0.2117],
         [-0.0809,  0.7677,  0.1819, -0.0303,  0.6084],
         [ 0.0633,  0.1819,  0.8575,  0.0237, -0.4764],
         [-0.0105, -0.0303,  0.0237,  0.9961,  0.0793],
         [ 0.2117,  0.6084, -0.4764,  0.0793, -0.5931]])
torch.matmul(m,z)

tensor([ 1.6091, -0.7781,  0.8085,  0.1312, -0.3005])

In [None]:
out_channels = 32
kernel_size = 4
latent_space_dimension = 64

x = torch.rand((10,1,64,64))

encoder = nn.Sequential(
                   nn.Conv2d(1,out_channels,kernel_size, stride=2),
                   nn.BatchNorm2d(out_channels),
                   nn.LeakyReLU(),
                   #nn.Dropout(0.25),

                   nn.Conv2d(out_channels,2*out_channels,kernel_size, stride=2),
                   nn.BatchNorm2d(2*out_channels),
                   nn.LeakyReLU(),
                   #nn.Dropout(0.25),

                   nn.Conv2d(2*out_channels,2*out_channels,kernel_size,stride=2),
                   nn.BatchNorm2d(2*out_channels),
                   nn.LeakyReLU(),
                   nn.Dropout(0.25),

                   nn.Conv2d(2*out_channels,3*out_channels,kernel_size, stride=2),
                   nn.BatchNorm2d(3*out_channels),
                   nn.LeakyReLU(),

                   nn.Flatten(),
                   nn.LazyLinear(2*latent_space_dimension)

                   ).to(torch.float32)

z = torch.rand((10, 1, 64))

decoder = nn.Sequential(nn.Linear(latent_space_dimension,4096),
                          nn.Unflatten(2, (int(math.sqrt(4096)),int(math.sqrt(4096)))),
                          nn.Upsample(size=[8,8], mode='bilinear', align_corners=False),
                          nn.Conv2d(in_channels=1, out_channels=3*out_channels, kernel_size=kernel_size),
                          nn.BatchNorm2d(3*out_channels),
                          nn.LeakyReLU(),

                          nn.Upsample(size=[16,16], mode='bilinear', align_corners=False),
                          nn.Conv2d(in_channels=3*out_channels, out_channels=2*out_channels, kernel_size=kernel_size),
                          nn.BatchNorm2d(2*out_channels),
                          nn.LeakyReLU(),

                          nn.Upsample(size=[16,16], mode='bilinear', align_corners=False),
                          nn.Conv2d(in_channels=2*out_channels, out_channels=out_channels, kernel_size=kernel_size),
                          nn.BatchNorm2d(out_channels),
                          nn.LeakyReLU(),

                          nn.Flatten(),
                          nn.LazyLinear(input_shape_image*possible_pixel_values),
                          nn.Unflatten(1,(input_shape_image,possible_pixel_values))

                          )



decoder(z).shape

torch.Size([10, 32, 13, 13])