## **Tarea 3**

In [11]:
import os
import torch
import torch.nn as nn
from torchvision import datasets, transforms
import hydra
import pytorch_lightning as pl
from omegaconf import DictConfig, OmegaConf
import matplotlib.pyplot as plt
import sys
from pytorch_lightning.callbacks import ModelCheckpoint, EarlyStopping
import numpy as np
from sklearn.manifold import TSNE
import seaborn as sns
import pandas as pd

# Verificar si se está ejecutando dentro de un notebook
def is_notebook() -> bool:
    """
    Comprueba si el código se está ejecutando dentro de un Jupyter Notebook.

    Returns:
        bool: True si es un notebook, False en caso contrario.
    """
    try:
        return 'ipykernel' in sys.modules
    except NameError:
        return False

class Autoencoder(nn.Module):
    def __init__(self, latent_dim=64):
        """
        Inicializa el autoencoder con un espacio latente de dimensión especificada.

        Args:
            latent_dim (int): Dimensión del espacio latente.
        """
        super(Autoencoder, self).__init__()
        self.latent_dim = latent_dim

        # Definición del encoder
        self.encoder = nn.Sequential(
            nn.Conv2d(1, 32, 3, stride=1, padding=1),   # Salida: 32 x 28 x 28
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 64, 3, stride=2, padding=1),  # Salida: 64 x 14 x 14
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 128, 3, stride=2, padding=1), # Salida: 128 x 7 x 7
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Flatten(),
            nn.Linear(128 * 7 * 7, 256),
            nn.ReLU(),
            nn.Linear(256, self.latent_dim)
        )

        # Definición del decoder
        self.decoder = nn.Sequential(
            nn.Linear(self.latent_dim, 256),
            nn.ReLU(),
            nn.Linear(256, 128 * 7 * 7),
            nn.ReLU(),
            nn.Unflatten(1, (128, 7, 7)),
            nn.ConvTranspose2d(128, 64, 3, stride=2, padding=1, output_padding=1), # Salida: 64 x 14 x 14
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.ConvTranspose2d(64, 32, 3, stride=2, padding=1, output_padding=1),  # Salida: 32 x 28 x 28
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 1, kernel_size=3, padding=1),  # Salida: 1 x 28 x 28
            nn.Sigmoid()
        )

    def encode(self, x):
        """
        Codifica las imágenes de entrada en un espacio latente.

        Args:
            x (Tensor): Imágenes de entrada.

        Returns:
            Tensor: Representaciones en el espacio latente.
        """
        return self.encoder(x)

    def decode(self, z):
        """
        Decodifica las representaciones latentes para reconstruir las imágenes.

        Args:
            z (Tensor): Representaciones en el espacio latente.

        Returns:
            Tensor: Imágenes reconstruidas.
        """
        return self.decoder(z)

    def forward(self, x):
        """
        Pasa adelante por el modelo completo (encoder y decoder).

        Args:
            x (Tensor): Imágenes de entrada.

        Returns:
            Tensor: Imágenes reconstruidas.
        """
        z = self.encode(x)
        return self.decode(z)

class LitAutoencoder(pl.LightningModule):
    def __init__(self, config):
        """
        Módulo Lightning que envuelve el autoencoder y maneja el entrenamiento.

        Args:
            config (DictConfig): Configuración del modelo y entrenamiento.
        """
        super(LitAutoencoder, self).__init__()
        self.save_hyperparameters()
        self.learning_rate = config.model.learning_rate
        self.weight_decay = config.model.weight_decay
        self.autoencoder = Autoencoder(latent_dim=config.model.latent_dim)

    def forward(self, x):
        """
        Pasa adelante por el autoencoder.

        Args:
            x (Tensor): Imágenes de entrada.

        Returns:
            Tensor: Imágenes reconstruidas.
        """
        return self.autoencoder(x)

    def _common_step(self, batch, batch_idx, step_type):
        """
        Paso común para entrenamiento, validación y prueba.

        Args:
            batch (tuple): Batch de datos.
            batch_idx (int): Índice del batch.
            step_type (str): Tipo de paso ('train', 'val' o 'test').

        Returns:
            dict: Diccionario con pérdida y métricas.
        """
        x, _ = batch
        x = x.to(self.device)
        z = self.autoencoder.encode(x)
        x_hat = self.autoencoder.decode(z)
        loss = nn.functional.mse_loss(x_hat, x)

        psnr = 20 * torch.log10(1.0 / torch.sqrt(loss))
        latent_norm = torch.norm(z, dim=1).mean()

        # Registro de métricas
        self.log(f'{step_type}/loss', loss)
        self.log(f'{step_type}/psnr', psnr)
        self.log(f'{step_type}/latent_norm', latent_norm)

        return {
            'loss': loss,
            'psnr': psnr,
            'reconstructions': x_hat,
            'originals': x,
            'latent': z
        }

    def training_step(self, batch, batch_idx):
        """
        Paso de entrenamiento.

        Args:
            batch (tuple): Batch de datos.
            batch_idx (int): Índice del batch.

        Returns:
            Tensor: Pérdida de entrenamiento.
        """
        results = self._common_step(batch, batch_idx, 'train')
        return results['loss']

    def validation_step(self, batch, batch_idx):
        """
        Paso de validación.

        Args:
            batch (tuple): Batch de datos.
            batch_idx (int): Índice del batch.
        """
        self._common_step(batch, batch_idx, 'val')

    def test_step(self, batch, batch_idx):
        """
        Paso de prueba.

        Args:
            batch (tuple): Batch de datos.
            batch_idx (int): Índice del batch.
        """
        self._common_step(batch, batch_idx, 'test')

    def configure_optimizers(self):
        """
        Configura los optimizadores y programadores de tasa de aprendizaje.

        Returns:
            dict: Diccionario con optimizador y programador.
        """
        optimizer = torch.optim.AdamW(
            self.parameters(),
            lr=self.learning_rate,
            weight_decay=self.weight_decay
        )
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer,
            mode='min',
            factor=0.5,
            patience=5,
            min_lr=1e-6
        )
        return {
            "optimizer": optimizer,
            "lr_scheduler": {
                "scheduler": scheduler,
                "monitor": "val/loss",
                "frequency": 1
            }
        }

class ReconstructionCallback(pl.Callback):
    def __init__(self, val_samples, save_dir, val_loader, epoch_interval=5, num_samples=10):
        """
        Callback para visualizar reconstrucciones y el espacio latente.

        Args:
            val_samples (Tensor): Muestras de validación.
            save_dir (str): Directorio para guardar resultados.
            val_loader (DataLoader): DataLoader de validación.
            epoch_interval (int): Intervalo de épocas para visualizar.
            num_samples (int): Número de muestras a visualizar.
        """
        super().__init__()
        self.val_samples = val_samples
        self.epoch_interval = epoch_interval
        self.num_samples = num_samples
        self.save_dir = save_dir
        self.val_loader = val_loader
        os.makedirs(save_dir, exist_ok=True)

    def on_validation_epoch_end(self, trainer, pl_module):
        """
        Evento al final de cada época de validación.

        Args:
            trainer (Trainer): Instancia del entrenador.
            pl_module (LightningModule): Modelo actual.
        """
        epoch = trainer.current_epoch
        if epoch % self.epoch_interval == 0 or epoch == trainer.max_epochs - 1:
            val_samples = self.val_samples.to(pl_module.device)
            reconstructed = pl_module(val_samples)
            fig = self.plot_reconstruction(val_samples, reconstructed, epoch)

            save_path = os.path.join(self.save_dir, f'reconstruction_epoch_{epoch}.png')
            plt.savefig(save_path)
            plt.close(fig)

            if epoch == trainer.max_epochs - 1:
                self.visualize_latent_space(pl_module)

    def plot_reconstruction(self, originals, reconstructed, epoch):
        """
        Genera y guarda una figura comparando originales y reconstrucciones.

        Args:
            originals (Tensor): Imágenes originales.
            reconstructed (Tensor): Imágenes reconstruidas.
            epoch (int): Época actual.

        Returns:
            Figure: Figura generada.
        """
        fig = plt.figure(figsize=(20, 4))
        plt.suptitle(f'Epoch {epoch}')

        originals = originals[:self.num_samples].cpu().detach()
        reconstructed = reconstructed[:self.num_samples].cpu().detach()

        for i in range(self.num_samples):
            # Imágenes originales
            ax = plt.subplot(2, self.num_samples, i + 1)
            plt.imshow(originals[i].squeeze(0), cmap='gray')
            if i == 0:
                plt.title("Original")
            plt.axis("off")

            # Imágenes reconstruidas
            ax = plt.subplot(2, self.num_samples, i + 1 + self.num_samples)
            plt.imshow(reconstructed[i].squeeze(0), cmap='gray')
            if i == 0:
                plt.title("Reconstrucción")
            plt.axis("off")

        plt.tight_layout()
        return fig

    def visualize_latent_space(self, pl_module):
        """
        Visualiza el espacio latente usando t-SNE al final del entrenamiento.

        Args:
            pl_module (LightningModule): Modelo entrenado.
        """
        latent_vectors = []
        labels = []

        pl_module.eval()

        with torch.no_grad():
            for batch in self.val_loader:
                x, y = batch
                z = pl_module.autoencoder.encode(x.to(pl_module.device))
                latent_vectors.append(z.cpu())
                labels.extend(y.numpy())

        latent_vectors = torch.cat(latent_vectors, dim=0).numpy()

        tsne = TSNE(n_components=2, random_state=42)
        latent_2d = tsne.fit_transform(latent_vectors)

        plt.figure(figsize=(10, 10))
        scatter = plt.scatter(latent_2d[:, 0], latent_2d[:, 1], c=labels, cmap='tab10')
        plt.colorbar(scatter)
        plt.title('Visualización t-SNE del Espacio Latente')
        plt.savefig(os.path.join(self.save_dir, 'latent_space_tsne.png'))
        plt.close()

# Adaptar para Jupyter o script regular
def main(cfg: DictConfig):
    """
    Función principal que configura los datos, el modelo y ejecuta el entrenamiento.

    Args:
        cfg (DictConfig): Configuración completa del experimento.
    """
    pl.seed_everything(cfg.seed)

    # Transformaciones y normalización
    transform = transforms.Compose([
        transforms.ToTensor(),
        # No se realiza normalización adicional porque la salida es entre 0 y 1
    ])

    # Carga del conjunto de datos completo de entrenamiento
    full_train_dataset = datasets.FashionMNIST(
        root=cfg.dataset.root,
        train=True,
        download=True,
        transform=transform
    )

    # División en entrenamiento y validación
    train_size = int((1 - cfg.dataset.val_split) * len(full_train_dataset))
    val_size = len(full_train_dataset) - train_size
    train_dataset, val_dataset = torch.utils.data.random_split(
        full_train_dataset,
        [train_size, val_size]
    )

    # DataLoader para entrenamiento
    train_loader = torch.utils.data.DataLoader(
        train_dataset,
        batch_size=cfg.dataset.batch_size,
        shuffle=True,
        num_workers=cfg.dataset.num_workers
    )

    # DataLoader para validación
    val_loader = torch.utils.data.DataLoader(
        val_dataset,
        batch_size=cfg.dataset.batch_size,
        shuffle=False,
        num_workers=cfg.dataset.num_workers
    )

    # DataLoader para prueba
    test_loader = torch.utils.data.DataLoader(
        datasets.FashionMNIST(
            root=cfg.dataset.root,
            train=False,
            download=True,
            transform=transform
        ),
        batch_size=cfg.dataset.batch_size,
        shuffle=False,
        num_workers=cfg.dataset.num_workers
    )

    # Inicialización del modelo
    model = LitAutoencoder(cfg)

    # Muestras para visualización de reconstrucciones
    val_samples, _ = next(iter(val_loader))
    val_samples = val_samples[:cfg.visualization.num_samples]

    # Definición de callbacks
    callbacks = [
        ReconstructionCallback(
            val_samples,
            save_dir=cfg.visualization.save_dir,
            val_loader=val_loader,
            epoch_interval=cfg.visualization.epoch_interval,
            num_samples=cfg.visualization.num_samples
        ),
        ModelCheckpoint(
            monitor='val/loss',
            dirpath=os.path.join(cfg.visualization.save_dir, 'checkpoints'),
            filename='autoencoder-{epoch:02d}-{val_loss:.2f}',
            save_top_k=3,
            mode='min'
        ),
        EarlyStopping(
            monitor='val/loss',
            patience=10,
            mode='min'
        )
    ]

    # Configuración del entrenador
    trainer = pl.Trainer(
        max_epochs=cfg.trainer.max_epochs,
        callbacks=callbacks,
        accelerator=cfg.trainer.accelerator,
        devices=cfg.trainer.devices,
        log_every_n_steps=cfg.trainer.log_every_n_steps,
        gradient_clip_val=cfg.trainer.gradient_clip_val,
        precision=cfg.trainer.precision,
        check_val_every_n_epoch=cfg.trainer.check_val_every_n_epoch
    )

    # Entrenamiento del modelo
    trainer.fit(model, train_loader, val_loader)
    # Prueba del modelo
    trainer.test(model, test_loader)

    return model, trainer

if is_notebook():
    # Si es un notebook, se configura OmegaConf manualmente
    cfg = OmegaConf.create({
        'seed': 42,
        'dataset': {
            'name': 'FashionMNIST',
            'root': './data',
            'batch_size': 128,
            'num_workers': 4,
            'val_split': 0.2,
            'normalize': {
                'mean': [0.5],
                'std': [0.5]
            }
        },
        'model': {
            'latent_dim': 64, 
            'learning_rate': 0.001,
            'weight_decay': 1e-5
        },
        'trainer': {
            'max_epochs': 50,  
            'accelerator': 'auto',
            'devices': 1,
            'log_every_n_steps': 50,
            'gradient_clip_val': 0.5,
            'precision': 32,
            'check_val_every_n_epoch': 1
        },
        'visualization': {
            'num_samples': 10,
            'epoch_interval': 5,
            'save_dir': 'visualization_results'
        }
    })
    main(cfg)
else:
    # Si es un script regular, se usa Hydra
    @hydra.main(config_path="conf", config_name="config", version_base=None)
    def hydra_main(cfg: DictConfig):
        """
        Función principal para ejecución con Hydra.

        Args:
            cfg (DictConfig): Configuración completa del experimento.
        """
        main(cfg)

    if __name__ == "__main__":
        hydra_main()


Seed set to 42


Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz to ./data\FashionMNIST\raw\train-images-idx3-ubyte.gz


100%|██████████| 26.4M/26.4M [00:15<00:00, 1.68MB/s]


Extracting ./data\FashionMNIST\raw\train-images-idx3-ubyte.gz to ./data\FashionMNIST\raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz to ./data\FashionMNIST\raw\train-labels-idx1-ubyte.gz


100%|██████████| 29.5k/29.5k [00:00<00:00, 188kB/s]


Extracting ./data\FashionMNIST\raw\train-labels-idx1-ubyte.gz to ./data\FashionMNIST\raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz to ./data\FashionMNIST\raw\t10k-images-idx3-ubyte.gz


100%|██████████| 4.42M/4.42M [00:02<00:00, 2.00MB/s]


Extracting ./data\FashionMNIST\raw\t10k-images-idx3-ubyte.gz to ./data\FashionMNIST\raw

Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz
Downloading http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz to ./data\FashionMNIST\raw\t10k-labels-idx1-ubyte.gz


100%|██████████| 5.15k/5.15k [00:00<00:00, 5.15MB/s]


Extracting ./data\FashionMNIST\raw\t10k-labels-idx1-ubyte.gz to ./data\FashionMNIST\raw



GPU available: False, used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs

  | Name        | Type        | Params | Mode 
----------------------------------------------------
0 | autoencoder | Autoencoder | 3.4 M  | train
----------------------------------------------------
3.4 M     Trainable params
0         Non-trainable params
3.4 M     Total params
13.747    Total estimated model params size (MB)
29        Modules in train mode
0         Modules in eval mode


Sanity Checking: |          | 0/? [00:00<?, ?it/s]

Training: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

Validation: |          | 0/? [00:00<?, ?it/s]

`Trainer.fit` stopped: `max_epochs=50` reached.
c:\Users\dnlal\AppData\Local\Programs\Python\Python312\Lib\site-packages\pytorch_lightning\trainer\connectors\data_connector.py:419: Consider setting `persistent_workers=True` in 'test_dataloader' to speed up the dataloader worker initialization.


Testing: |          | 0/? [00:00<?, ?it/s]