## VAEs- Deep Unsupervised Learning Project

In [1]:
import numpy as np
import matplotlib.pyplot as plt
from torchvision import datasets, transforms
import torch
from torch import nn
from torch import optim
import torch.nn.functional as F
from IPython.display import clear_output
import time

### Import libraries
### Choose device
### Define transformations
### Load datasets
### Inspect a batch (for understanding)
### Visualize sample images (for understanding)

In [2]:
# Choose device

device = ("cuda" if torch.cuda.is_available() else "cpu")
print(device)


# Define transformations
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.1,), (0.3,))
])

# Load datasets

train_set = datasets.FashionMNIST('Data_FashionMNIST/', download=True, train=True, transform=transform)
trainLoader = torch.utils.data.DataLoader(train_set, batch_size=64, shuffle=True)

test_set = datasets.FashionMNIST('DATA_FashionMNIST/', download=True, train=False, transform=transform)
testLoader = torch.utils.data.DataLoader(test_set, batch_size=64, shuffle=True)

cpu


In [3]:
def visualize_reconstructed_batch(model, data, reconstructed_data):
    # Get a grid of images from the batch
    images = data.view(-1, 28, 28)
    reconstructed_images = reconstructed_data.view(-1, 28, 28)
    
    # Ajustar el tamaño de la cuadrícula
    fig, axs = plt.subplots(2, len(images), figsize=(10, 5))
    
    # Verificar la lógica de indexación
    for i in range(len(reconstructed_images)):
        axs[0, i].imshow(images[i].detach().numpy(), cmap="gray")  # Salia un error relacionado a arrays, añadió detach() y numpy()
        axs[0, i].set_title("Original")
        axs[0, i].axis('off')

        axs[1, i].imshow(reconstructed_images[i].detach().numpy(), cmap="gray")  # Se añadió detach() y numpy()
        axs[1, i].set_title("Reconstructed")
        axs[1, i].axis('off')

    fig.suptitle("Sample Reconstructed Images")
    plt.tight_layout()
    plt.show()



In [4]:
class VAE(nn.Module):
    def __init__(self):
        super(VAE, self).__init__()

        # Encoder
        self.encoder = nn.Sequential(
            nn.Linear(784, 400),
            nn.ReLU(),
            nn.Linear(400, 20),
            nn.ReLU(),
        )

        # Reparameterización
        self.mu = nn.Linear(20, 10)
        self.logvar = nn.Linear(20, 10)

        # Decoder
        self.decoder = nn.Sequential(
            nn.Linear(10, 20),
            nn.ReLU(),
            nn.Linear(20, 400),
            nn.ReLU(),
            nn.Linear(400, 784),
            nn.Sigmoid(),
        )

    def encode(self, x):
        h = self.encoder(x)
        mu = self.mu(h)
        logvar = self.logvar(h)
        return mu, logvar

    def decode(self, z):
        h = self.decoder(z)
        return h

    def reparameterize(self, mu, logvar):
        std = torch.exp(0.5 * logvar)
        eps = torch.randn_like(std)
        return mu + eps * std

    def forward(self, x):
        mu, logvar = self.encode(x)
        z = self.reparameterize(mu, logvar)
        reconstructed_x = self.decode(z)
        return reconstructed_x, mu, logvar

In [5]:
class AutoEncoder(nn.Module):
    def __init__(self, num_hidden=8):
        super().__init__()

        # Set the number of hidden units
        self.num_hidden = num_hidden

        # Define the encoder part of the autoencoder
        self.encoder = nn.Sequential(
            nn.Linear(784, 256),  # input size: 784, output size: 256
            nn.ReLU(),  # apply the ReLU activation function
            nn.Linear(256, self.num_hidden),  # input size: 256, output size: num_hidden
            nn.ReLU(),  # apply the ReLU activation function
        )

        # Define the decoder part of the autoencoder
        self.decoder = nn.Sequential(
            nn.Linear(self.num_hidden, 256),  # input size: num_hidden, output size: 256
            nn.ReLU(),  # apply the ReLU activation function
            nn.Linear(256, 784),  # input size: 256, output size: 784
            nn.Sigmoid(),  # apply the sigmoid activation function to compress the output to a range of (0, 1)
        )

    def forward(self, x, z=None):
        # Pass the input through the encoder
        encoded = self.encoder(x)
        # Pass the encoded representation through the decoder
        
        #Decode input data (reconstruction)
        decoded = self.decoder(encoded)
        # Return both the encoded representation and the reconstructed output
        
        # Decode noise vector (image generation)
        if z is not None:
            generated_image = self.decoder(z)
            return encoded, decoded, generated_image
        else:
            return encoded, decoded, None
        

In [6]:
model = VAE().to(device)

In [7]:
def loss_function(reconstructed_x, x, mu, logvar):
    # Reconstrucción
    reconstruction_loss = nn.MSELoss()(reconstructed_x, x)

    # Regularización KL
    kl_divergence = 0.5 * torch.sum(mu**2 + logvar - logvar.exp() - 1, dim=1)
    kl_divergence = torch.mean(kl_divergence)

    # Pérdida total
    loss = reconstruction_loss + kl_divergence
    return loss

optimizer = optim.Adam(model.parameters(), lr=0.001)

In [None]:

epochs = 10

for epoch in range(epochs):
    for batch_idx, (data, _) in enumerate(trainLoader):
        # Aplanar las imágenes
        data = data.view(data.size(0), -1)

        # Salida del modelo
        reconstructed_x, mu, logvar = model(data)

        # Cálculo de la pérdida
        loss = loss_function(reconstructed_x, data, mu, logvar)

        # Optimización
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Impresión de información
        if batch_idx % 100 == 0:
            print(f'Epoch: {epoch+1}/{epochs}, Batch: {batch_idx}/{len(trainLoader)}, Loss: {loss.item()}')

            # Visualización de imágenes
            clear_output(wait=True)
            visualize_reconstructed_batch(model, data, reconstructed_x)

# Visualizar una imagen generada aleatoriamente
generate_and_visualize_sample(model)


Epoch: 1/10, Batch: 0/938, Loss: -8.515525817871094
