In [30]:
# Imports básicos
import torch
import torch.nn as nn
import torch.nn.functional as F
import pytorch_lightning as pl
import torchmetrics
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import wandb
import os
import warnings
warnings.filterwarnings('ignore')

# Importar nuestros módulos de carga de datos
from dataset import ButterflyDataset, get_transforms
from datamodule import ButterflyDataModule

# Configurar dispositivo
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')


Using device: cpu


In [31]:
# Configuración del proyecto
DATA_DIR = "filtered_dataset/train"
METADATA_CSV = "filtered_dataset/filtered_dataset_metadata.csv"
BATCH_SIZE = 32
IMAGE_SIZE = 224
NUM_EPOCHS = 50
LEARNING_RATE = 1e-3
SEED = 42

# Configurar semilla para reproducibilidad
pl.seed_everything(SEED)

# Configurar Wandb
wandb.init(
    project="butterfly-transfer-learning",
    name="experiment_1_modular",
    config={
        "batch_size": BATCH_SIZE,
        "image_size": IMAGE_SIZE,
        "num_epochs": NUM_EPOCHS,
        "learning_rate": LEARNING_RATE,
        "seed": SEED
    }
)


Seed set to 42


[34m[1mwandb[0m: [32m[41mERROR[0m The nbformat package was not found. It is required to save notebook history.


In [32]:
# Configurar DataModule con multiprocessing habilitado
data_module_70_30 = ButterflyDataModule(
    data_dir=DATA_DIR,
    metadata_csv=METADATA_CSV,
    batch_size=BATCH_SIZE,
    num_workers=4,  # Ahora podemos usar multiprocessing sin problemas
    image_size=IMAGE_SIZE,
    labeled_ratio=0.3,  # 30% etiquetado, 70% no etiquetado
    seed=SEED
)

# Configurar datasets
data_module_70_30.setup()
print("Información del dataset 70-30:")
info = data_module_70_30.get_dataset_info()
for key, value in info.items():
    print(f"{key}: {value}")


Found 3693 images across 30 classes
Classes: ['ARCIGERA FLOWER MOTH', 'ATALA', 'BANDED ORANGE HELICONIAN', 'BANDED TIGER MOTH', 'BIRD CHERRY ERMINE MOTH', 'BROOKES BIRDWING', 'BROWN ARGUS', 'BROWN SIPROETA', 'CHALK HILL BLUE', 'CHECQUERED SKIPPER', 'CLEOPATRA', 'COPPER TAIL', 'CRECENT', 'DANAID EGGFLY', 'EASTERN COMA', 'EASTERN PINE ELFIN', 'EMPEROR GUM MOTH', 'GREAT JAY', 'GREEN HAIRSTREAK', 'HERCULES MOTH', 'HUMMING BIRD HAWK MOTH', 'Iphiclus sister', 'MILBERTS TORTOISESHELL', 'MOURNING CLOAK', 'ORANGE TIP', 'RED CRACKER', 'ROSY MAPLE MOTH', 'SCARCE SWALLOW', 'SLEEPY ORANGE', 'WHITE LINED SPHINX MOTH']
Dataset splits - Train: 2584, Val: 739, Test: 370
Semi-supervised split - Labeled: 775, Unlabeled: 1809
Información del dataset 70-30:
num_classes: 30
class_names: ['ARCIGERA FLOWER MOTH', 'ATALA', 'BANDED ORANGE HELICONIAN', 'BANDED TIGER MOTH', 'BIRD CHERRY ERMINE MOTH', 'BROOKES BIRDWING', 'BROWN ARGUS', 'BROWN SIPROETA', 'CHALK HILL BLUE', 'CHECQUERED SKIPPER', 'CLEOPATRA', 'COPPER

In [34]:
class DoubleConv(nn.Module):
    """Doble convolución: (conv => BN => ReLU) * 2"""
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """Downscaling con maxpool y double conv"""
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    """Upscaling con transpose conv y double conv"""
    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.up = nn.ConvTranspose2d(in_channels, in_channels // 2, kernel_size=2, stride=2)
        self.conv = DoubleConv(in_channels, out_channels)

    def forward(self, x1, x2):
        x1 = self.up(x1)
        # Skip connection - OBLIGATORIA según especificaciones
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]
        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        x = torch.cat([x2, x1], dim=1)  # Skip connection
        return self.conv(x)


In [45]:
class UNetAutoencoder(pl.LightningModule):
    """
    U-Net Autoencoder con skip connections obligatorias.
    Usado para preentrenamiento no supervisado.
    """
    def __init__(self, n_channels=3, learning_rate=1e-3):
        super().__init__()
        self.learning_rate = learning_rate
        self.save_hyperparameters()
        
        # Encoder (Contracting path)
        self.inc = DoubleConv(n_channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        self.down4 = Down(512, 1024)
        
        # Decoder (Expansive path)
        self.up1 = Up(1024, 512)
        self.up2 = Up(512, 256)
        self.up3 = Up(256, 128)
        self.up4 = Up(128, 64)
        self.outc = nn.Conv2d(64, n_channels, kernel_size=1)
        
        # Para extraer features del encoder
        self.encoder_features = None
    
    def forward(self, x):
        # Encoder
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        
        # Guardar features del encoder para transfer learning
        self.encoder_features = x5
        
        # Decoder con skip connections
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        
        return torch.sigmoid(logits)  # Salida entre 0 y 1
    
    def get_encoder_features(self, x):
        """Extrae features del encoder para transfer learning"""
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        return x5
    
    def training_step(self, batch, batch_idx):
        if isinstance(batch, (list, tuple)):
            x = batch[0]  # Get only images, ignore labels
        else:
            x = batch

        x_hat = self(x)
        loss = F.mse_loss(x_hat, x)
        
        self.log('train_loss', loss, prog_bar=True)
        wandb.log({'train_autoencoder_loss': loss})
        return loss
    
    def validation_step(self, batch, batch_idx):
        if isinstance(batch, (list, tuple)):
            x = batch[0]  # Get only images, ignore labels
        else:
            x = batch

        x_hat = self(x)
        loss = F.mse_loss(x_hat, x)
        
        self.log('val_loss', loss, prog_bar=True)
        wandb.log({'val_autoencoder_loss': loss})
        return loss
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='min', factor=0.5, patience=5
        )
        return {
            'optimizer': optimizer,
            'lr_scheduler': scheduler,
            'monitor': 'val_loss'
        }


In [46]:
class ButterflyClassifier(pl.LightningModule):
    """
    Clasificador base para mariposas.
    Puede usar features preentrenadas del autoencoder o entrenar desde cero.
    """
    def __init__(self, num_classes, learning_rate=1e-3, pretrained_encoder=None, freeze_encoder=False):
        super().__init__()
        self.num_classes = num_classes
        self.learning_rate = learning_rate
        self.freeze_encoder = freeze_encoder
        self.save_hyperparameters(ignore=['pretrained_encoder'])
        
        if pretrained_encoder is not None:
            # Usar encoder preentrenado
            self.encoder = nn.Sequential(
                pretrained_encoder.inc,
                pretrained_encoder.down1,
                pretrained_encoder.down2,
                pretrained_encoder.down3,
                pretrained_encoder.down4
            )
            
            if freeze_encoder:
                # Congelar parámetros del encoder (B1)
                for param in self.encoder.parameters():
                    param.requires_grad = False
                print("Encoder congelado para fine-tuning")
            else:
                print("Encoder descongelado para fine-tuning completo")
        else:
            # Crear encoder desde cero (Clasificador A)
            self.encoder = nn.Sequential(
                DoubleConv(3, 64),
                Down(64, 128),
                Down(128, 256),
                Down(256, 512),
                Down(512, 1024)
            )
            print("Encoder creado desde cero")
        
        # Clasificador
        self.classifier = nn.Sequential(
            nn.AdaptiveAvgPool2d((1, 1)),
            nn.Flatten(),
            nn.Dropout(0.5),
            nn.Linear(1024, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, num_classes)
        )
        
        # Métricas
        self.train_acc = torchmetrics.Accuracy(task='multiclass', num_classes=num_classes)
        self.val_acc = torchmetrics.Accuracy(task='multiclass', num_classes=num_classes)
        self.test_acc = torchmetrics.Accuracy(task='multiclass', num_classes=num_classes)
    
    def forward(self, x):
        features = self.encoder(x)
        return self.classifier(features)
    
    def training_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        
        preds = torch.argmax(logits, dim=1)
        acc = self.train_acc(preds, y)
        
        self.log('train_loss', loss, prog_bar=True)
        self.log('train_acc', acc, prog_bar=True)
        
        return loss
    
    def validation_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        
        preds = torch.argmax(logits, dim=1)
        acc = self.val_acc(preds, y)
        
        self.log('val_loss', loss, prog_bar=True)
        self.log('val_acc', acc, prog_bar=True)
        
        return loss
    
    def test_step(self, batch, batch_idx):
        x, y = batch
        logits = self(x)
        loss = F.cross_entropy(logits, y)
        
        preds = torch.argmax(logits, dim=1)
        acc = self.test_acc(preds, y)
        
        self.log('test_loss', loss)
        self.log('test_acc', acc)
        
        return {'test_loss': loss, 'test_acc': acc, 'preds': preds, 'targets': y}
    
    def configure_optimizers(self):
        optimizer = torch.optim.Adam(self.parameters(), lr=self.learning_rate)
        scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
            optimizer, mode='min', factor=0.5, patience=5
        )
        return {
            'optimizer': optimizer,
            'lr_scheduler': scheduler,
            'monitor': 'val_loss'
        }


In [47]:
def train_autoencoder(data_module, max_epochs=30):
    """Entrena el autoencoder U-Net"""
    print("=== Entrenando Autoencoder U-Net ===")
    
    # Crear modelo
    autoencoder = UNetAutoencoder(learning_rate=LEARNING_RATE)
    
    # Callbacks
    callbacks = [
        pl.callbacks.EarlyStopping(
            monitor='val_loss',
            patience=10,
            mode='min'
        ),
        pl.callbacks.ModelCheckpoint(
            monitor='val_loss',
            mode='min',
            save_top_k=1,
            filename='autoencoder-{epoch:02d}-{val_loss:.2f}'
        )
    ]
    
    # Trainer
    trainer = pl.Trainer(
        max_epochs=max_epochs,
        callbacks=callbacks,
        accelerator='cpu',
        devices='1',
        log_every_n_steps=10
    )
    
    # Entrenar usando datos no etiquetados
    trainer.fit(
        model=autoencoder,
        train_dataloaders=data_module.unlabeled_dataloader(),
        val_dataloaders=data_module.val_dataloader()
    )
    
    return autoencoder


def train_classifier(data_module, pretrained_encoder=None, freeze_encoder=False, 
                    classifier_name="", max_epochs=50):
    """Entrena un clasificador"""
    print(f"=== Entrenando {classifier_name} ===")
    
    # Crear modelo
    classifier = ButterflyClassifier(
        num_classes=data_module.num_classes,
        learning_rate=LEARNING_RATE,
        pretrained_encoder=pretrained_encoder,
        freeze_encoder=freeze_encoder
    )
    
    # Callbacks
    callbacks = [
        pl.callbacks.EarlyStopping(
            monitor='val_acc',
            patience=15,
            mode='max'
        ),
        pl.callbacks.ModelCheckpoint(
            monitor='val_acc',
            mode='max',
            save_top_k=1,
            filename=f'{classifier_name.lower().replace(" ", "_")}-{{epoch:02d}}-{{val_acc:.2f}}'
        )
    ]
    
    # Trainer
    trainer = pl.Trainer(
        max_epochs=max_epochs,
        callbacks=callbacks,
        accelerator='cpu',
        devices='1',
        log_every_n_steps=10
    )
    
    # Entrenar usando datos etiquetados
    trainer.fit(
        model=classifier,
        train_dataloaders=data_module.labeled_dataloader(),
        val_dataloaders=data_module.val_dataloader()
    )
    
    return classifier, trainer


def evaluate_model(model, data_module, model_name=""):
    """Evalúa un modelo y retorna métricas detalladas"""
    print(f"=== Evaluando {model_name} ===")
    
    # Crear trainer para testing
    trainer = pl.Trainer(accelerator='cpu', devices='1')
    
    # Evaluar en conjunto de test
    test_results = trainer.test(model, data_module.test_dataloader())[0]
    
    # Obtener predicciones para matriz de confusión
    model.eval()
    all_preds = []
    all_targets = []
    
    with torch.no_grad():
        for batch in data_module.test_dataloader():
            x, y = batch
            x, y = x.to(model.device), y.to(model.device)
            logits = model(x)
            preds = torch.argmax(logits, dim=1)
            
            all_preds.extend(preds.cpu().numpy())
            all_targets.extend(y.cpu().numpy())
    
    # Calcular métricas adicionales
    precision, recall, f1, _ = precision_recall_fscore_support(
        all_targets, all_preds, average='weighted'
    )
    
    results = {
        'model_name': model_name,
        'test_accuracy': test_results['test_acc'],
        'test_loss': test_results['test_loss'],
        'precision': precision,
        'recall': recall,
        'f1_score': f1,
        'predictions': all_preds,
        'targets': all_targets
    }
    
    # Log a Wandb
    wandb.log({
        f'{model_name}_test_accuracy': test_results['test_acc'],
        f'{model_name}_test_loss': test_results['test_loss'],
        f'{model_name}_precision': precision,
        f'{model_name}_recall': recall,
        f'{model_name}_f1_score': f1
    })
    
    return results


In [None]:
# Entrenar autoencoder con datos no etiquetados (70%)
autoencoder_70_30 = train_autoencoder(data_module_70_30, max_epochs=30)

print("\\nAutoencoder 70-30 entrenado exitosamente!")


GPU available: True (mps), used: False
TPU available: False, using: 0 TPU cores
HPU available: False, using: 0 HPUs

  | Name  | Type       | Params | Mode 
---------------------------------------------
0 | inc   | DoubleConv | 39.0 K | train
1 | down1 | Down       | 221 K  | train
2 | down2 | Down       | 886 K  | train
3 | down3 | Down       | 3.5 M  | train
4 | down4 | Down       | 14.2 M | train
5 | up1   | Up         | 9.2 M  | train
6 | up2   | Up         | 2.3 M  | train
7 | up3   | Up         | 574 K  | train
8 | up4   | Up         | 143 K  | train
9 | outc  | Conv2d     | 195    | train
---------------------------------------------
31.0 M    Trainable params
0         Non-trainable params
31.0 M    Total params
124.175   Total estimated model params size (MB)
93        Modules in train mode
0         Modules in eval mode


=== Entrenando Autoencoder U-Net ===
Epoch 0:  40%|████      | 23/57 [14:04<20:48,  0.03it/s, v_num=10, train_loss=1.630]

wandb-core(24169) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.
wandb-core(24190) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


Epoch 0:  42%|████▏     | 24/57 [14:46<20:18,  0.03it/s, v_num=10, train_loss=1.900]

wandb-core(24205) MallocStackLogging: can't turn off malloc stack logging because it was not enabled.


In [None]:
# Entrenar los tres clasificadores para experimento 70-30

# Clasificador A: Sin preentrenamiento
print("\\n" + "="*50)
classifier_A_70_30, trainer_A = train_classifier(
    data_module_70_30, 
    pretrained_encoder=None, 
    freeze_encoder=False,
    classifier_name="Clasificador A (Sin preentrenamiento) 70-30",
    max_epochs=NUM_EPOCHS
)

# Clasificador B1: Con preentrenamiento, encoder congelado
print("\\n" + "="*50)
classifier_B1_70_30, trainer_B1 = train_classifier(
    data_module_70_30, 
    pretrained_encoder=autoencoder_70_30, 
    freeze_encoder=True,
    classifier_name="Clasificador B1 (Preentrenado congelado) 70-30",
    max_epochs=NUM_EPOCHS
)

# Clasificador B2: Con preentrenamiento, fine-tuning completo
print("\\n" + "="*50)
classifier_B2_70_30, trainer_B2 = train_classifier(
    data_module_70_30, 
    pretrained_encoder=autoencoder_70_30, 
    freeze_encoder=False,
    classifier_name="Clasificador B2 (Preentrenado fine-tuning) 70-30",
    max_epochs=NUM_EPOCHS
)

print("\\nTodos los clasificadores 70-30 entrenados exitosamente!")


In [None]:
# Evaluar todos los modelos del experimento 70-30
results_70_30 = {}

results_70_30['A'] = evaluate_model(
    classifier_A_70_30, data_module_70_30, 
    "Clasificador A 70-30"
)

results_70_30['B1'] = evaluate_model(
    classifier_B1_70_30, data_module_70_30, 
    "Clasificador B1 70-30"
)

results_70_30['B2'] = evaluate_model(
    classifier_B2_70_30, data_module_70_30, 
    "Clasificador B2 70-30"
)

print("\\nResultados Experimento 70-30:")
print("-" * 60)
for name, results in results_70_30.items():
    print(f"{results['model_name']}:")
    print(f"  Accuracy: {results['test_accuracy']:.4f}")
    print(f"  Precision: {results['precision']:.4f}")
    print(f"  Recall: {results['recall']:.4f}")
    print(f"  F1-Score: {results['f1_score']:.4f}")
    print()


In [None]:
# Configurar DataModule para experimento 90-10
data_module_90_10 = ButterflyDataModule(
    data_dir=DATA_DIR,
    metadata_csv=METADATA_CSV,
    batch_size=BATCH_SIZE,
    num_workers=4,
    image_size=IMAGE_SIZE,
    labeled_ratio=0.1,  # 10% etiquetado, 90% no etiquetado
    seed=SEED
)

# Configurar datasets
data_module_90_10.setup()
print("Información del dataset 90-10:")
info = data_module_90_10.get_dataset_info()
for key, value in info.items():
    print(f"{key}: {value}")


In [None]:
# Entrenar autoencoder con datos no etiquetados (90%)
autoencoder_90_10 = train_autoencoder(data_module_90_10, max_epochs=30)

print("\\nAutoencoder 90-10 entrenado exitosamente!")


In [None]:
# Entrenar los tres clasificadores para experimento 90-10

# Clasificador A: Sin preentrenamiento
print("\\n" + "="*50)
classifier_A_90_10, trainer_A_90 = train_classifier(
    data_module_90_10, 
    pretrained_encoder=None, 
    freeze_encoder=False,
    classifier_name="Clasificador A (Sin preentrenamiento) 90-10",
    max_epochs=NUM_EPOCHS
)

# Clasificador B1: Con preentrenamiento, encoder congelado
print("\\n" + "="*50)
classifier_B1_90_10, trainer_B1_90 = train_classifier(
    data_module_90_10, 
    pretrained_encoder=autoencoder_90_10, 
    freeze_encoder=True,
    classifier_name="Clasificador B1 (Preentrenado congelado) 90-10",
    max_epochs=NUM_EPOCHS
)

# Clasificador B2: Con preentrenamiento, fine-tuning completo
print("\\n" + "="*50)
classifier_B2_90_10, trainer_B2_90 = train_classifier(
    data_module_90_10, 
    pretrained_encoder=autoencoder_90_10, 
    freeze_encoder=False,
    classifier_name="Clasificador B2 (Preentrenado fine-tuning) 90-10",
    max_epochs=NUM_EPOCHS
)

print("\\nTodos los clasificadores 90-10 entrenados exitosamente!")


In [None]:
# Evaluar todos los modelos del experimento 90-10
results_90_10 = {}

results_90_10['A'] = evaluate_model(
    classifier_A_90_10, data_module_90_10, 
    "Clasificador A 90-10"
)

results_90_10['B1'] = evaluate_model(
    classifier_B1_90_10, data_module_90_10, 
    "Clasificador B1 90-10"
)

results_90_10['B2'] = evaluate_model(
    classifier_B2_90_10, data_module_90_10, 
    "Clasificador B2 90-10"
)

print("\\nResultados Experimento 90-10:")
print("-" * 60)
for name, results in results_90_10.items():
    print(f"{results['model_name']}:")
    print(f"  Accuracy: {results['test_accuracy']:.4f}")
    print(f"  Precision: {results['precision']:.4f}")
    print(f"  Recall: {results['recall']:.4f}")
    print(f"  F1-Score: {results['f1_score']:.4f}")
    print()


In [None]:
import time
import torch.quantization as quantization

def quantize_model(model, data_loader, model_name=""):
    """
    Cuantiza un modelo usando quantization aware training
    """
    print(f"=== Cuantizando {model_name} ===")
    
    # Preparar modelo para cuantización
    model.eval()
    model_quantized = torch.quantization.quantize_dynamic(
        model, 
        {nn.Linear, nn.Conv2d}, 
        dtype=torch.qint8
    )
    
    return model_quantized

def compare_model_performance(original_model, quantized_model, data_loader, model_name=""):
    """
    Compara el rendimiento entre modelo original y cuantizado
    """
    print(f"\\n=== Comparando rendimiento: {model_name} ===")
    
    # Función para medir tiempo de inferencia
    def measure_inference_time(model, data_loader, num_batches=10):
        model.eval()
        times = []
        
        with torch.no_grad():
            for i, batch in enumerate(data_loader):
                if i >= num_batches:
                    break
                    
                x, _ = batch
                start_time = time.time()
                _ = model(x)
                end_time = time.time()
                times.append(end_time - start_time)
        
        return np.mean(times), np.std(times)
    
    # Medir rendimiento
    orig_mean, orig_std = measure_inference_time(original_model, data_loader)
    quant_mean, quant_std = measure_inference_time(quantized_model, data_loader)
    
    # Calcular tamaños de modelo
    def get_model_size(model):
        param_size = 0
        buffer_size = 0
        
        for param in model.parameters():
            param_size += param.nelement() * param.element_size()
        
        for buffer in model.buffers():
            buffer_size += buffer.nelement() * buffer.element_size()
        
        return (param_size + buffer_size) / 1024 / 1024  # MB
    
    orig_size = get_model_size(original_model)
    quant_size = get_model_size(quantized_model)
    
    # Resultados
    results = {
        'model_name': model_name,
        'original_inference_time_mean': orig_mean,
        'original_inference_time_std': orig_std,
        'quantized_inference_time_mean': quant_mean,
        'quantized_inference_time_std': quant_std,
        'speedup': orig_mean / quant_mean,
        'original_size_mb': orig_size,
        'quantized_size_mb': quant_size,
        'compression_ratio': orig_size / quant_size
    }
    
    print(f"Modelo Original:")
    print(f"  Tiempo de inferencia: {orig_mean:.4f} ± {orig_std:.4f} segundos")
    print(f"  Tamaño: {orig_size:.2f} MB")
    print(f"\\nModelo Cuantizado:")
    print(f"  Tiempo de inferencia: {quant_mean:.4f} ± {quant_std:.4f} segundos")
    print(f"  Tamaño: {quant_size:.2f} MB")
    print(f"\\nMejoras:")
    print(f"  Speedup: {results['speedup']:.2f}x")
    print(f"  Compresión: {results['compression_ratio']:.2f}x")
    
    # Log a Wandb
    wandb.log({
        f'{model_name}_quantization_speedup': results['speedup'],
        f'{model_name}_quantization_compression': results['compression_ratio'],
        f'{model_name}_original_size_mb': orig_size,
        f'{model_name}_quantized_size_mb': quant_size
    })
    
    return results, quantized_model


In [None]:
# Cuantizar los mejores modelos de cada experimento
quantization_results = {}

# Experimento 70-30
print("\\n" + "="*60)
print("CUANTIZACIÓN EXPERIMENTO 70-30")
print("="*60)

# Cuantizar clasificador B2 (mejor rendimiento esperado)
model_B2_70_30_quantized = quantize_model(
    classifier_B2_70_30, 
    data_module_70_30.test_dataloader(), 
    "Clasificador B2 70-30"
)

quantization_results['B2_70_30'], _ = compare_model_performance(
    classifier_B2_70_30, 
    model_B2_70_30_quantized,
    data_module_70_30.test_dataloader(),
    "Clasificador B2 70-30"
)

# Experimento 90-10
print("\\n" + "="*60)
print("CUANTIZACIÓN EXPERIMENTO 90-10")
print("="*60)

# Cuantizar clasificador B2 (mejor rendimiento esperado)
model_B2_90_10_quantized = quantize_model(
    classifier_B2_90_10, 
    data_module_90_10.test_dataloader(), 
    "Clasificador B2 90-10"
)

quantization_results['B2_90_10'], _ = compare_model_performance(
    classifier_B2_90_10, 
    model_B2_90_10_quantized,
    data_module_90_10.test_dataloader(),
    "Clasificador B2 90-10"
)


In [None]:
def plot_confusion_matrix(results, class_names, title=""):
    """Plotea matriz de confusión"""
    cm = confusion_matrix(results['targets'], results['predictions'])
    
    plt.figure(figsize=(12, 10))
    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues',
                xticklabels=class_names, yticklabels=class_names)
    plt.title(f'Matriz de Confusión - {title}')
    plt.xlabel('Predicción')
    plt.ylabel('Verdadero')
    plt.xticks(rotation=45, ha='right')
    plt.yticks(rotation=0)
    plt.tight_layout()
    plt.show()

def plot_results_comparison():
    """Compara resultados de todos los experimentos"""
    
    # Preparar datos para plotting
    experiments = ['70-30', '90-10']
    classifiers = ['A', 'B1', 'B2']
    metrics = ['test_accuracy', 'precision', 'recall', 'f1_score']
    
    # Crear subplots
    fig, axes = plt.subplots(2, 2, figsize=(15, 12))
    fig.suptitle('Comparación de Rendimiento por Experimento', fontsize=16)
    
    for i, metric in enumerate(metrics):
        ax = axes[i//2, i%2]
        
        # Datos para plotting
        x = np.arange(len(classifiers))
        width = 0.35
        
        # Valores para cada experimento
        values_70_30 = [results_70_30[clf][metric] for clf in classifiers]
        values_90_10 = [results_90_10[clf][metric] for clf in classifiers]
        
        # Crear barras
        bars1 = ax.bar(x - width/2, values_70_30, width, label='70-30', alpha=0.8)
        bars2 = ax.bar(x + width/2, values_90_10, width, label='90-10', alpha=0.8)
        
        # Configurar gráfico
        ax.set_xlabel('Clasificadores')
        ax.set_ylabel(metric.replace('_', ' ').title())
        ax.set_title(f'{metric.replace("_", " ").title()} por Clasificador')
        ax.set_xticks(x)
        ax.set_xticklabels(classifiers)
        ax.legend()
        ax.grid(True, alpha=0.3)
        
        # Añadir valores en las barras
        for bar in bars1:
            height = bar.get_height()
            ax.text(bar.get_x() + bar.get_width()/2., height,
                   f'{height:.3f}', ha='center', va='bottom', fontsize=9)
        
        for bar in bars2:
            height = bar.get_height()
            ax.text(bar.get_x() + bar.get_width()/2., height,
                   f'{height:.3f}', ha='center', va='bottom', fontsize=9)
    
    plt.tight_layout()
    plt.show()

def plot_quantization_comparison():
    """Compara resultados de cuantización"""
    
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 6))
    
    # Speedup comparison
    models = list(quantization_results.keys())
    speedups = [quantization_results[model]['speedup'] for model in models]
    
    ax1.bar(models, speedups, color=['skyblue', 'lightcoral'])
    ax1.set_title('Speedup por Cuantización')
    ax1.set_ylabel('Speedup (x)')
    ax1.grid(True, alpha=0.3)
    
    # Añadir valores en las barras
    for i, v in enumerate(speedups):
        ax1.text(i, v + 0.01, f'{v:.2f}x', ha='center', va='bottom')
    
    # Compression comparison
    compressions = [quantization_results[model]['compression_ratio'] for model in models]
    
    ax2.bar(models, compressions, color=['lightgreen', 'lightsalmon'])
    ax2.set_title('Ratio de Compresión')
    ax2.set_ylabel('Compresión (x)')
    ax2.grid(True, alpha=0.3)
    
    # Añadir valores en las barras
    for i, v in enumerate(compressions):
        ax2.text(i, v + 0.01, f'{v:.2f}x', ha='center', va='bottom')
    
    plt.tight_layout()
    plt.show()

# Generar visualizaciones
print("\\n" + "="*60)
print("GENERANDO VISUALIZACIONES")
print("="*60)

# Comparación de rendimiento
plot_results_comparison()

# Comparación de cuantización
plot_quantization_comparison()

# Matrices de confusión para los mejores modelos
print("\\nMatrices de Confusión:")
plot_confusion_matrix(results_70_30['B2'], data_module_70_30.class_names, 
                     "Clasificador B2 (70-30)")
plot_confusion_matrix(results_90_10['B2'], data_module_90_10.class_names, 
                     "Clasificador B2 (90-10)")


In [None]:
def generate_final_report():
    """Genera reporte final con todos los resultados"""
    
    print("\\n" + "="*80)
    print("REPORTE FINAL - EXPERIMENTO 1: TRANSFER LEARNING CON AUTOENCODERS U-NET")
    print("="*80)
    
    print("\\n📊 RESUMEN DE EXPERIMENTOS:")
    print("-" * 50)
    
    # Tabla comparativa
    print(f"{'Experimento':<15} {'Clasificador':<15} {'Accuracy':<10} {'Precision':<10} {'Recall':<10} {'F1-Score':<10}")
    print("-" * 75)
    
    for exp_name, results in [("70-30", results_70_30), ("90-10", results_90_10)]:
        for clf_name, clf_results in results.items():
            print(f"{exp_name:<15} {clf_name:<15} {clf_results['test_accuracy']:<10.4f} "
                  f"{clf_results['precision']:<10.4f} {clf_results['recall']:<10.4f} "
                  f"{clf_results['f1_score']:<10.4f}")
    
    print("\\n🔍 ANÁLISIS COMPARATIVO:")
    print("-" * 50)
    
    # Mejor modelo por experimento
    best_70_30 = max(results_70_30.items(), key=lambda x: x[1]['test_accuracy'])
    best_90_10 = max(results_90_10.items(), key=lambda x: x[1]['test_accuracy'])
    
    print(f"Mejor modelo 70-30: {best_70_30[0]} con accuracy {best_70_30[1]['test_accuracy']:.4f}")
    print(f"Mejor modelo 90-10: {best_90_10[0]} con accuracy {best_90_10[1]['test_accuracy']:.4f}")
    
    # Impacto del preentrenamiento
    print("\\n📈 IMPACTO DEL PREENTRENAMIENTO:")
    print("-" * 50)
    
    for exp_name, results in [("70-30", results_70_30), ("90-10", results_90_10)]:
        baseline = results['A']['test_accuracy']
        b1_improvement = (results['B1']['test_accuracy'] - baseline) * 100
        b2_improvement = (results['B2']['test_accuracy'] - baseline) * 100
        
        print(f"\\nExperimento {exp_name}:")
        print(f"  Clasificador A (baseline): {baseline:.4f}")
        print(f"  Mejora B1 vs A: {b1_improvement:+.2f}%")
        print(f"  Mejora B2 vs A: {b2_improvement:+.2f}%")
    
    # Impacto de la cantidad de datos etiquetados
    print("\\n📉 IMPACTO DE LA CANTIDAD DE DATOS ETIQUETADOS:")
    print("-" * 50)
    
    for clf in ['A', 'B1', 'B2']:
        acc_70_30 = results_70_30[clf]['test_accuracy']
        acc_90_10 = results_90_10[clf]['test_accuracy']
        diff = (acc_70_30 - acc_90_10) * 100
        
        print(f"Clasificador {clf}: 70-30 vs 90-10 = {diff:+.2f}%")
    
    # Resultados de cuantización
    print("\\n⚡ RESULTADOS DE CUANTIZACIÓN:")
    print("-" * 50)
    
    for model_name, quant_results in quantization_results.items():
        print(f"\\n{model_name}:")
        print(f"  Speedup: {quant_results['speedup']:.2f}x")
        print(f"  Compresión: {quant_results['compression_ratio']:.2f}x")
        print(f"  Tamaño original: {quant_results['original_size_mb']:.2f} MB")
        print(f"  Tamaño cuantizado: {quant_results['quantized_size_mb']:.2f} MB")
    
    print("\\n🎯 CONCLUSIONES PRINCIPALES:")
    print("-" * 50)
    print("1. El preentrenamiento con autoencoders U-Net mejora consistentemente el rendimiento")
    print("2. El fine-tuning completo (B2) generalmente supera al encoder congelado (B1)")
    print("3. Mayor cantidad de datos etiquetados (30% vs 10%) mejora significativamente los resultados")
    print("4. La cuantización logra reducciones importantes en tamaño y tiempo de inferencia")
    print("5. Las skip connections del U-Net son efectivas para el transfer learning")
    
    # Log final a Wandb
    wandb.log({
        'experiment_complete': True,
        'best_70_30_accuracy': best_70_30[1]['test_accuracy'],
        'best_90_10_accuracy': best_90_10[1]['test_accuracy'],
        'best_overall_model': best_70_30[0] if best_70_30[1]['test_accuracy'] > best_90_10[1]['test_accuracy'] else best_90_10[0]
    })
    
    print("\\n✅ Experimento completado exitosamente!")
    print(f"📝 Resultados guardados en Wandb: {wandb.run.url}")

# Generar reporte final
generate_final_report()


In [None]:
# Limpiar y cerrar Wandb
wandb.finish()
print("\\n🏁 Experimento finalizado. Wandb cerrado correctamente.")
