In [2]:
import numpy as np
import pandas as pd
import math

from sklearn.datasets import load_iris
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from torch.utils.data import DataLoader, Dataset

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

from tqdm.auto import tqdm

In [3]:
if torch.cuda.is_available():
    device = torch.device("cuda")
elif torch.backends.mps.is_available() and torch.backends.mps.is_built():
    device = torch.device("mps")
else:
    device = torch.device("cpu")
    
    
print(f'Actual device: {device}')

Actual device: mps


In [4]:
X = load_iris()['data']
X
min_max = MinMaxScaler()
X = min_max.fit_transform(X)
X[:10]

array([[0.22222222, 0.625     , 0.06779661, 0.04166667],
       [0.16666667, 0.41666667, 0.06779661, 0.04166667],
       [0.11111111, 0.5       , 0.05084746, 0.04166667],
       [0.08333333, 0.45833333, 0.08474576, 0.04166667],
       [0.19444444, 0.66666667, 0.06779661, 0.04166667],
       [0.30555556, 0.79166667, 0.11864407, 0.125     ],
       [0.08333333, 0.58333333, 0.06779661, 0.08333333],
       [0.19444444, 0.58333333, 0.08474576, 0.04166667],
       [0.02777778, 0.375     , 0.06779661, 0.04166667],
       [0.16666667, 0.45833333, 0.08474576, 0.        ]])

In [5]:
class IrisDataset(Dataset):
    def __init__(self, data):
        self.data = torch.tensor(data, dtype = torch.float32)
    
    def __len__(self):
        return len(self.data)
    
    def __getitem__(self, idx):
        return self.data[idx]

dataset = IrisDataset(X)
dataloader = DataLoader(dataset, batch_size = 32, shuffle = True)

for batch in dataloader:
    print(f'{batch.shape}\n{batch}')
    break

torch.Size([32, 4])
tensor([[0.2222, 0.7083, 0.0847, 0.1250],
        [0.9444, 0.4167, 0.8644, 0.9167],
        [0.0833, 0.4583, 0.0847, 0.0417],
        [0.6111, 0.4167, 0.8136, 0.8750],
        [0.5000, 0.2500, 0.7797, 0.5417],
        [0.2222, 0.6250, 0.0678, 0.0833],
        [0.2222, 0.7500, 0.0847, 0.0833],
        [0.6111, 0.4167, 0.7119, 0.7917],
        [0.5556, 0.3333, 0.6949, 0.5833],
        [0.5000, 0.4167, 0.6610, 0.7083],
        [0.3611, 0.2917, 0.5424, 0.5000],
        [0.4167, 0.3333, 0.6949, 0.9583],
        [0.6667, 0.5417, 0.7966, 0.8333],
        [0.1111, 0.5000, 0.1017, 0.0417],
        [0.5278, 0.3333, 0.6441, 0.7083],
        [0.0833, 0.6667, 0.0000, 0.0417],
        [0.1667, 0.4583, 0.0847, 0.0000],
        [0.5833, 0.5000, 0.5932, 0.5833],
        [0.1944, 0.5833, 0.0847, 0.0417],
        [0.8056, 0.6667, 0.8644, 1.0000],
        [0.5833, 0.2917, 0.7288, 0.7500],
        [0.8333, 0.3750, 0.8983, 0.7083],
        [0.2778, 0.7083, 0.0847, 0.0417],
        [0.611

In [6]:
for i in dataloader:
    batch = i

In [29]:
n_layers = 12
latent_space = 8


layers_list = []
last_layer = int(math.log2(latent_space))
last_layer

for idx, i in enumerate(range(n_layers, last_layer, -1)):
    input_dim = i
    output_dim = i - 1
    if input_dim != last_layer:
        layers_list.append(nn.Sequential(nn.Linear(2**input_dim, 2**output_dim), nn.LeakyReLU(), nn.Dropout()))
nn.Sequential(*layers_list)

Sequential(
  (0): Sequential(
    (0): Linear(in_features=4096, out_features=2048, bias=True)
    (1): LeakyReLU(negative_slope=0.01)
    (2): Dropout(p=0.5, inplace=False)
  )
  (1): Sequential(
    (0): Linear(in_features=2048, out_features=1024, bias=True)
    (1): LeakyReLU(negative_slope=0.01)
    (2): Dropout(p=0.5, inplace=False)
  )
  (2): Sequential(
    (0): Linear(in_features=1024, out_features=512, bias=True)
    (1): LeakyReLU(negative_slope=0.01)
    (2): Dropout(p=0.5, inplace=False)
  )
  (3): Sequential(
    (0): Linear(in_features=512, out_features=256, bias=True)
    (1): LeakyReLU(negative_slope=0.01)
    (2): Dropout(p=0.5, inplace=False)
  )
  (4): Sequential(
    (0): Linear(in_features=256, out_features=128, bias=True)
    (1): LeakyReLU(negative_slope=0.01)
    (2): Dropout(p=0.5, inplace=False)
  )
  (5): Sequential(
    (0): Linear(in_features=128, out_features=64, bias=True)
    (1): LeakyReLU(negative_slope=0.01)
    (2): Dropout(p=0.5, inplace=False)
  )


In [35]:
input_features = 4  
n_layers = 10       
latent_space_dim = 8
dropout_rate = 0.5  

max_neurons = 2 ** n_layers

layers_list = []

layers_list.append(nn.Sequential(
    nn.Linear(input_features, max_neurons),
    nn.LeakyReLU(negative_slope=0.01),
    nn.Dropout(p=dropout_rate)
))

current_dim = max_neurons
while current_dim > latent_space_dim:
    next_dim = current_dim // 2
    layers_list.append(nn.Sequential(
        nn.Linear(current_dim, next_dim),
        nn.LeakyReLU(negative_slope=0.01),
        nn.Dropout(p=dropout_rate)
    ))
    current_dim = next_dim

In [56]:
class VariationalAutoEncoder(nn.Module):
    def __init__(self, 
                 enc_input_dim = 4, 
                 encoder_layers = 5,
                 dropout_rate = 0.5,
                 latent_space_dim = 2):
        super(VariationalAutoEncoder, self).__init__()
        
        max_neurons = 2 ** encoder_layers

        encoder_layers_list = [
            nn.Linear(enc_input_dim, max_neurons),
            nn.LeakyReLU(),
            nn.Dropout(dropout_rate)
        ]
        
        current_dim = max_neurons
        while current_dim > latent_space_dim:
            next_dim = current_dim // 2
            encoder_layers_list.extend(nn.Sequential(
                nn.Linear(current_dim, next_dim),
                nn.LeakyReLU(),
                nn.Dropout(dropout_rate)
            ))
            current_dim = next_dim
        
        encoder_layers_list.append(nn.Linear(current_dim, latent_space_dim))

        self.Encoder = nn.Sequential(*encoder_layers_list)
        
        self.fc_mu = nn.Linear(next_dim, latent_space_dim)
        self.fc_logvar = nn.Linear(next_dim, latent_space_dim)
        
        
    def reparameterize(self, mu, logvar):
        """
        Implementa il trick della rielaborazione per campionare dallo spazio latente.
        Args:
            mu (torch.Tensor): Media della distribuzione latente (dimensione: batch_size x latent_dim).
            logvar (torch.Tensor): Log-varianza della distribuzione latente (dimensione: batch_size x latent_dim).
        Returns:
            z (torch.Tensor): Campione dallo spazio latente (dimensione: batch_size x latent_dim).
        """
        std = torch.exp(0.5 * logvar)  # Calcola lo scarto quadratico medio
        epsilon = torch.randn_like(std)  # Campiona da una distribuzione normale standard
        z = mu + epsilon * std  # Applica il trick della rielaborazione
        return z
        
    def forward(self, x):
        x = self.Encoder(x)
        mu, log_var = self.fc_mu(x), self.fc_logvar(x)
        trick = self.reparameterize(mu, log_var)
        
        return mu, log_var, x, trick.shape
        
        
vae = VariationalAutoEncoder()
vae(batch)

(tensor([[-0.3428, -0.5308],
         [-0.5442, -0.3724],
         [-0.2604, -0.5979],
         [-0.2742, -0.5854],
         [-0.2871, -0.5766],
         [-0.2587, -0.5979],
         [-0.2604, -0.5979],
         [-0.5520, -0.3663],
         [-0.3171, -0.5528],
         [-0.2604, -0.5979],
         [-0.3853, -0.4974],
         [-0.5582, -0.3598],
         [-0.2589, -0.5979],
         [-0.2604, -0.5979],
         [-0.4109, -0.4783],
         [-0.2604, -0.5979],
         [-0.2604, -0.5979],
         [-0.3733, -0.5065],
         [-0.2604, -0.5979],
         [-0.2604, -0.5979],
         [-0.4542, -0.4424],
         [-0.3166, -0.5532]], grad_fn=<AddmmBackward0>),
 tensor([[0.1768, 0.1684],
         [0.4561, 0.1976],
         [0.0613, 0.1585],
         [0.0813, 0.1590],
         [0.0986, 0.1622],
         [0.0597, 0.1571],
         [0.0613, 0.1585],
         [0.4669, 0.1987],
         [0.1403, 0.1663],
         [0.0613, 0.1585],
         [0.2357, 0.1745],
         [0.4763, 0.1981],
         [