<div style="text-align: center;">
<a target="_blank" href="https://colab.research.google.com/github/bmalcover/aa_2425/blob/main/10_VGG_Custom_Dataset/VGG_Custom.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>
</div>

In [2]:
import os

import torch
from torch import nn
import torch.optim as optim
from tqdm.auto import tqdm

from torchvision import datasets, models, transforms
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
import xml.etree.ElementTree as ET

import cv2

# Introducció

En aquesta pràctica treballarem amb l'arquitectura ResNet (Residual Network), centrant-nos en el concepte clau dels blocs residuals, que ja hem explorat a classe. Recordem breument la principal innovació d'aquestes: el bloc residual. Aquest introdueix una connexió directa (*shortcut connection*) entre l'entrada i la sortida d'un grup de capes, tal com es representa a continuació:

$$
\mathbf{y} = \mathcal{F}(\mathbf{x}, \{W_i\}) + \mathbf{x}
$$

On:
- $\mathbf{x}$ és l'entrada del bloc.
- $\mathcal{F}(\mathbf{x}, \{W_i\})$ és la transformació no lineal aplicada per les capes internes (normalment convolucions, normalització i ReLU).
- $\mathbf{y}$ és la sortida, que combina l'entrada original xx amb la transformació.

Aquest simple però efectiu mecanisme permet que el model "aprengui" només la diferència (o residual) entre l'entrada i la sortida esperada, en lloc de l'objectiu complet. Això facilita l'entrenament i redueix el risc de degradació del rendiment en xarxes profundes.

A classe hem vist que aquests blocs residuals són la base de diferents variants de ResNet, com ara ResNet-18, ResNet-50, etc., amb diferències en la profunditat i el nombre de blocs. Ara posarem en pràctica aquest coneixement treballant amb ResNets per a tasques de classificació d’imatges

# Cream mòduls amb Pytorch

En aquesta pràctica aprendrem a definir mòduls personalitzats en PyTorch utilitzant la classe base ``torch.nn.Module``. Aquesta classe permet encapsular les capes i operacions d’una xarxa neuronal, facilitant-ne la reutilització i el manteniment.

**Pasos per crear un mòdul en Pytorch:**
1. **Heretar de nn.Module**. Tots els mòduls personalitzats en PyTorch han de derivar de la classe base torch.nn.Module.
2. **Definir les capes en el constructor __init__.** Al constructor del mòdul (__init__), s’han d’inicialitzar les capes que s’utilitzaran, com ara convolucions, capes lineals o funcions d’activació.
3. **Implementar la funció forward.** Aquesta funció defineix com flueixen les dades a través del mòdul. Aquí s’apliquen les capes definides al constructor de manera seqüencial o segons sigui necessari.


## Cream un bloc residual

**Revisar sessió teòrica**

El nostre bloc residual tendrà dues capes convolucionals, batch norm i emprarà ReLU.

In [3]:
class ResidualBlock(nn.Module):
    def __init__(self, in_channels, out_channels) -> None:
        super().__init__()
        self.conv1 = nn.Conv2d(
            in_channels=in_channels,
            out_channels=out_channels,
            kernel_size=(3, 3),
            padding='same',
            bias=False
        )
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()

        self.conv2 = nn.Conv2d(
            in_channels=out_channels,
            out_channels=out_channels,
            kernel_size=(3, 3),
            padding='same',
            bias=False
        )
        self.bn2 = nn.BatchNorm2d(out_channels)

        # Ajustar la dimensión de la entrada si es necesario
        self.adjust_input = nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False) if in_channels != out_channels else None

    def forward(self, x):
        identity = x

        out = self.conv1(x)
        out = self.bn1(out)
        out = self.relu(out)

        out = self.conv2(out)
        out = self.bn2(out)

        if self.adjust_input is not None:
            identity = self.adjust_input(identity)

        out += identity
        out = self.relu(out)

        return out

## Una VGG16 residual

Una vegada el tenim implementat farem dos models:
1. Model VGG16 **normal**. Un model com el que vàrem veure la setmana passada sense preentranement.
2. Model VGG16 **ampliat**. Heu d'afegir a una VGG16 dos blocs residuals al final de l'extractor de característiques. Per fer-ho s'ha d'emprar la mateixa estratègia que heu vist a les sessions anteriors per fer fine-tunning.

Entrena ambdós models amb el conjunt de dades [Fashion MNIST](https://github.com/zalandoresearch/fashion-mnist), i contesta a la següent pregunta:

- És el mateix resultat una xarxa que l'altra? Si és així o no diguès per què?

In [8]:
import torch
from torchvision import transforms
from torch.utils.data import DataLoader, Dataset
import requests
import os
import gzip
import numpy as np
from PIL import Image

class FashionMNIST(Dataset):
    def __init__(self, root, train=True, transform=None, download=False):
        self.root = root
        self.train = train
        self.transform = transform
        self.download = download

        if self.download:
            self._download()

        if self.train:
            self.data, self.targets = self._load_data('train-images-idx3-ubyte.gz', 'train-labels-idx1-ubyte.gz')
        else:
            self.data, self.targets = self._load_data('t10k-images-idx3-ubyte.gz', 't10k-labels-idx1-ubyte.gz')

    def _download(self):
        urls = {
            'train-images-idx3-ubyte.gz': 'http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-images-idx3-ubyte.gz',
            'train-labels-idx1-ubyte.gz': 'http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/train-labels-idx1-ubyte.gz',
            't10k-images-idx3-ubyte.gz': 'http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-images-idx3-ubyte.gz',
            't10k-labels-idx1-ubyte.gz': 'http://fashion-mnist.s3-website.eu-central-1.amazonaws.com/t10k-labels-idx1-ubyte.gz'
        }

        os.makedirs(self.root, exist_ok=True)

        for filename, url in urls.items():
            filepath = os.path.join(self.root, filename)
            if not os.path.exists(filepath):
                print(f'Downloading {filename}...')
                response = requests.get(url)
                with open(filepath, 'wb') as f:
                    f.write(response.content)

    def _load_data(self, images_file, labels_file):
        with gzip.open(os.path.join(self.root, images_file), 'rb') as imgpath:
            data = np.frombuffer(imgpath.read(), np.uint8, offset=16).reshape(-1, 28, 28)
        with gzip.open(os.path.join(self.root, labels_file), 'rb') as lblpath:
            targets = np.frombuffer(lblpath.read(), np.uint8, offset=8)
        return data, targets

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        img, target = self.data[index], self.targets[index]
        img = Image.fromarray(img, mode='L')

        if self.transform is not None:
            img = self.transform(img)

        return img, target

BATCH_SIZE = 4
EPOCHS = 5

# Definir las transformaciones
transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Lambda(lambda x: x.repeat(3, 1, 1)),  # Replicar el canal 3 veces
    transforms.Normalize((0.1307,), (0.3081,))
])

# Descargar y cargar el dataset Fashion MNIST
train_dataset = FashionMNIST(root='./data', train=True, download=True, transform=transform)
test_dataset = FashionMNIST(root='./data', train=False, download=True, transform=transform)

# Crear DataLoaders
train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=False)

In [5]:
vgg16 = models.vgg16(weights=False)



In [9]:
loss_fn = nn.BCEWithLogitsLoss()
learning_rate = 1e-3  # Hiperparàmetre
optimizer = optim.Adam(vgg16.parameters(), lr=learning_rate)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = vgg16.to(device)

In [10]:
from sklearn.metrics import accuracy_score

running_loss = []
running_acc = []

running_test_loss = []
running_test_acc_cnn = []

for t in tqdm(range(EPOCHS), desc="Èpoques"):
    batch_loss = 0
    batch_acc = 0

    i_batch = 1
    # Iteram els batches.
    for i_batch, (x, y) in tqdm(enumerate(train_loader), desc=f"Batches (Època {t + 1})"):
        vgg16.train()  # Posam el model a mode entranament.

        optimizer.zero_grad()

        # 1. PREDICCIÓ
        y_pred = vgg16(x.to(device))

        # 2. CALCUL DE LA PÈRDUA
        # Computa la pèrdua: l'error de predicció vs el valor correcte
        # Es guarda la pèrdua en un array per futures visualitzacions

        loss = loss_fn(y_pred, y.to(device))

        #3. GRADIENT
        vgg16.zero_grad()
        loss.backward()

        # Actualitza els pesos utilitzant l'algorisme d'actualització
        #4. OPTIMITZACIO
        with torch.no_grad():
            optimizer.step()

        # 5. EVALUAM EL MODEL
        vgg16.eval()  # Mode avaluació de la xarxa

        y_pred = vgg16(x.to(device)).detach().cpu().numpy()
        batch_loss += (loss_fn(y_pred, y).detach())

        y_pred_class = (y_pred > 0.5).double()
        batch_acc += accuracy_score(y, y_pred_class)

    running_loss.append(batch_loss / (i_batch + 1))
    running_acc.append(batch_acc / (i_batch + 1))

    batch_test_loss = 0
    batch_test_acc = 0

    vgg16.eval()
    for i_batch, (x, y) in enumerate(test_loader):
        y_pred = vgg16(x.to(device))
        batch_test_loss += (loss_fn(y_pred, y.to(device)).detach())

        y_pred_class = (y_pred > 0.5).double()
        batch_test_acc += accuracy_score(y, y_pred_class)

    running_test_loss.append(batch_test_loss / (i_batch + 1))
    running_test_acc_cnn.append(batch_test_acc / (i_batch + 1))

Èpoques:   0%|          | 0/5 [00:00<?, ?it/s]

Batches (Època 1): 0it [00:00, ?it/s]

RuntimeError: Given input size: (512x1x1). Calculated output size: (512x0x0). Output size is too small