# Laboratorio 8

In [1]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
import matplotlib.pyplot as plt
import pandas as pd
import time, copy, os
from typing import Dict, Any

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
device

device(type='cpu')

## 1. Dataset y DataLoaders

In [2]:
DATA_DIR = './data'
BATCH_SIZE = 64
NUM_WORKERS = 2
NUM_CLASSES = 10

imagenet_mean = [0.485, 0.456, 0.406]
imagenet_std = [0.229, 0.224, 0.225]

train_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(imagenet_mean, imagenet_std),
])
test_transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(imagenet_mean, imagenet_std),
])

train_dataset = datasets.CIFAR10(root=DATA_DIR, train=True, download=True, transform=train_transform)
test_dataset = datasets.CIFAR10(root=DATA_DIR, train=False, download=True, transform=test_transform)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=NUM_WORKERS)
val_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False, num_workers=NUM_WORKERS)

dataloaders = {'train': train_loader, 'val': val_loader}
dataset_sizes = {'train': len(train_dataset), 'val': len(test_dataset)}
class_names = train_dataset.classes
dataset_sizes

100%|██████████| 170M/170M [00:02<00:00, 70.4MB/s]


{'train': 50000, 'val': 10000}

## 2. Funciones auxiliares


In [3]:
def train_model(model, criterion, optimizer, dataloaders, dataset_sizes, device, num_epochs=5, scheduler=None):
    since = time.time()
    best_model_wts = copy.deepcopy(model.state_dict())
    best_acc = 0.0
    history = {
        'train_loss': [], 'val_loss': [],
        'train_acc': [], 'val_acc': []
    }

    for epoch in range(num_epochs):
        print(f'Epoch {epoch+1}/{num_epochs}')
        print('-' * 20)

        for phase in ['train', 'val']:
            if phase == 'train':
                model.train()
            else:
                model.eval()

            running_loss = 0.0
            running_corrects = 0

            for inputs, labels in dataloaders[phase]:
                inputs = inputs.to(device)
                labels = labels.to(device)

                optimizer.zero_grad()

                with torch.set_grad_enabled(phase == 'train'):
                    outputs = model(inputs)
                    _, preds = torch.max(outputs, 1)
                    loss = criterion(outputs, labels)

                    if phase == 'train':
                        loss.backward()
                        optimizer.step()

                running_loss += loss.item() * inputs.size(0)
                running_corrects += torch.sum(preds == labels.data)

            epoch_loss = running_loss / dataset_sizes[phase]
            epoch_acc = running_corrects.double() / dataset_sizes[phase]

            history[f'{phase}_loss'].append(epoch_loss)
            history[f'{phase}_acc'].append(epoch_acc.item())

            print(f'{phase} Loss: {epoch_loss:.4f} Acc: {epoch_acc:.4f}')

            if phase == 'val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_model_wts = copy.deepcopy(model.state_dict())

        if scheduler is not None:
            scheduler.step()

    time_elapsed = time.time() - since
    print(f'Training complete in {time_elapsed // 60:.0f}m {time_elapsed % 60:.0f}s')
    print(f'Best val Acc: {best_acc:4f}')

    model.load_state_dict(best_model_wts)
    return model, history


## a) Evaluación sin entrenamiento adicional

In [4]:
def get_vgg16(num_classes=10, freeze_features=False, pretrained=True):
    model = models.vgg16(weights=models.VGG16_Weights.IMAGENET1K_V1 if pretrained else None)
    if freeze_features:
        for p in model.features.parameters():
            p.requires_grad = False
    in_features = model.classifier[-1].in_features
    model.classifier[-1] = nn.Linear(in_features, num_classes)
    return model

baseline_model = get_vgg16(num_classes=NUM_CLASSES, freeze_features=False, pretrained=True).to(device)
criterion = nn.CrossEntropyLoss()

baseline_model.eval()
running_loss = 0.0
running_corrects = 0
for inputs, labels in dataloaders['val']:
    inputs = inputs.to(device)
    labels = labels.to(device)
    with torch.no_grad():
        outputs = baseline_model(inputs)
        _, preds = torch.max(outputs, 1)
        loss = criterion(outputs, labels)
    running_loss += loss.item() * inputs.size(0)
    running_corrects += torch.sum(preds == labels.data)

baseline_val_loss = running_loss / dataset_sizes['val']
baseline_val_acc = running_corrects.double() / dataset_sizes['val']
print('Baseline val loss:', baseline_val_loss)
print('Baseline val acc:', baseline_val_acc.item())

Downloading: "https://download.pytorch.org/models/vgg16-397923af.pth" to /root/.cache/torch/hub/checkpoints/vgg16-397923af.pth


100%|██████████| 528M/528M [00:03<00:00, 155MB/s]


Baseline val loss: 2.371482478713989
Baseline val acc: 0.1065


## b) Feature Extraction

In [None]:
feature_model = get_vgg16(num_classes=NUM_CLASSES, freeze_features=True, pretrained=True).to(device)
criterion = nn.CrossEntropyLoss()
params_to_update = [p for p in feature_model.parameters() if p.requires_grad]
optimizer_ft = optim.Adam(params_to_update, lr=1e-3)

feature_model, history_feature = train_model(
    feature_model,
    criterion,
    optimizer_ft,
    dataloaders,
    dataset_sizes,
    device,
    num_epochs=3
)


## c) Fine-Tuning

In [None]:
finetune_model = get_vgg16(num_classes=NUM_CLASSES, freeze_features=False, pretrained=True).to(device)
criterion = nn.CrossEntropyLoss()
optimizer_ft_all = optim.Adam(finetune_model.parameters(), lr=1e-4)
scheduler = optim.lr_scheduler.StepLR(optimizer_ft_all, step_size=2, gamma=0.1)

finetune_model, history_finetune = train_model(
    finetune_model,
    criterion,
    optimizer_ft_all,
    dataloaders,
    dataset_sizes,
    device,
    num_epochs=3,
    scheduler=scheduler
)


## Evaluación y comparación de resultados


In [None]:
def plot_history(history: Dict[str, Any], title: str):
    plt.figure()
    plt.plot(history['train_loss'], label='train loss')
    plt.plot(history['val_loss'], label='val loss')
    plt.title(f'{title} - Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()
    plt.show()

    plt.figure()
    plt.plot(history['train_acc'], label='train acc')
    plt.plot(history['val_acc'], label='val acc')
    plt.title(f'{title} - Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()
    plt.show()

plot_history(history_feature, 'Feature Extraction')
plot_history(history_finetune, 'Fine-Tuning')

### Tabla comparativa

In [None]:
results = []

results.append({
    'escenario': 'a) baseline (sin entrenamiento)',
    'train_acc': None,
    'train_loss': None,
    'val_acc': baseline_val_acc.item(),
    'val_loss': baseline_val_loss,
})

def best_from_history(h):
    best_idx = int(torch.tensor(h['val_acc']).argmax().item())
    return h['train_acc'][best_idx], h['train_loss'][best_idx], h['val_acc'][best_idx], h['val_loss'][best_idx]

fa_tr_acc, fa_tr_loss, fa_val_acc, fa_val_loss = best_from_history(history_feature)
results.append({
    'escenario': 'b) feature extraction',
    'train_acc': fa_tr_acc,
    'train_loss': fa_tr_loss,
    'val_acc': fa_val_acc,
    'val_loss': fa_val_loss,
})

fi_tr_acc, fi_tr_loss, fi_val_acc, fi_val_loss = best_from_history(history_finetune)
results.append({
    'escenario': 'c) fine-tuning',
    'train_acc': fi_tr_acc,
    'train_loss': fi_tr_loss,
    'val_acc': fi_val_acc,
    'val_loss': fi_val_loss,
})

df_results = pd.DataFrame(results)
df_results