<div style="text-align: center;">
<a target="_blank" href="https://colab.research.google.com/github/miquelmn/aa_2526/blob/main/07_Batch_normalization/Batch_normalization.ipynb">
  <img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/>
</a>
</div>

# Batch Normalization

## Objectius

En aquesta pràctica, ampliarem el treball realitzat amb AlexNet i *transfer learning*, incorporant tècniques de regularització i optimització per millorar el rendiment del model. Els objectius són:

- **Implementar Batch Normalization**: afegir capes de normalització per estabilitzar i accelerar l'entrenament.
- **Comparar resultats**: analitzar l'impacte de cada tècnica en el rendiment final del model.
- **Optimització d'hiperparàmetres**: provar diferents configuracions per trobar la millor combinació.

Aquest enfocament permetrà comprendre com les tècniques vistes a teoria milloren la generalització i eviten l'overfitting en problemes reals de classificació d'imatges.

Una segona part serà emprar els nous models vists a classe de teoria.

## Introducció

### Batch Normalization

La Batch Normalization és una tècnica que normalitza les activacions de cada capa durant l'entrenament, utilitzant la mitjana i la desviació estàndard del mini-batch actual. Els seus principals avantatges són:

- **Accelera l'entrenament**: permet utilitzar learning rates més alts.
- **Redueix la sensibilitat a la inicialització**: els pesos inicials tenen menys impacte.
- **Actua com a regularitzador**: redueix la necessitat de Dropout en alguns casos.
- **Millora la convergència**: facilita que el model arribi a millors mínims.


La formulació d'aquesta operació, tal com heu vist a classe de teoria, és la següent:

$$ \hat{x}_i = \frac{x_i - \mu_B}{\sqrt{\sigma_B^2 + \epsilon}}$$

$$ y_i = \gamma \hat{x}_i + \beta, $$

on $\gamma$ i $\beta$ són paràmetres entrenables.

Per implementar-ho feim operacions diferents a entrenament i validació.

In [15]:
import torch
from torch import nn
import torch.optim as optim
from torchvision import datasets, models, transforms
from torch.optim import Adam
import time, os
from PIL import Image
from torch.optim import lr_scheduler
from tempfile import TemporaryDirectory
from torch.utils.data import DataLoader, random_split



class MyBatchNorm1d(nn.Module):
    def __init__(self, num_features, eps=1e-5, momentum=0.1):
        super().__init__()
        self.num_features = num_features

        # Evitar divisió per 0
        self.eps = eps

        # Momentum
        self.momentum = momentum

        # Paràmetres aprenables: gamma (weight) i beta (bias)
        self.gamma = nn.Parameter(torch.ones(num_features))
        self.beta = nn.Parameter(torch.zeros(num_features))

        # Estadístiques que s’acumulen durant l’entrenament però que no són paràmetres
        self.register_buffer("running_mean", torch.zeros(num_features))
        self.register_buffer("running_var", torch.ones(num_features))

    def forward(self, x):
        if self.training:
            # Calcular mitjana i variància del batch
            batch_mean = x.mean(dim=0)
            batch_var = x.var(dim=0, unbiased=False)

            # Actualitzar estadístiques globals
            self.running_mean = (1-self.momentum)*self.running_mean + self.momentum*batch_mean
            self.running_var = (1-self.momentum)*self.running_var + self.momentum*batch_var
            

            # Normalitzar el batch actual
            x_hat = (x-batch_mean) / torch.sqrt(batch_var + self.eps) 
        else:
            # En mode d’avaluació, s’usen les estadístiques acumulades
            x_hat = (x - self.running_mean) / torch.sqrt(self.running_var + self.eps)

        # Aplicar gamma i beta
        y = self.gamma * x_hat + self.beta
        return y


In [16]:
BATCH_SIZE = 32
EPOCHS = 15
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {DEVICE}")


imagenet_mean = [0.485, 0.456, 0.406]
imagenet_std  = [0.229, 0.224, 0.225]


train_tf = transforms.Compose([
    transforms.Resize(256),
    transforms.RandomResizedCrop(224),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize(imagenet_mean, imagenet_std),
])

eval_tf = transforms.Compose([
    transforms.Resize(256),
    transforms.CenterCrop(224),
    transforms.ToTensor(),
    transforms.Normalize(imagenet_mean, imagenet_std),
])


data_root = '/Users/dylancanning/Desktop/UIB/AprenentatgeAutomatic/aa_2526/data/tiny-imagenet-200'

train_full = datasets.ImageFolder(root=f'{data_root}/train', transform=train_tf)
test_ds    = datasets.ImageFolder(root=f'{data_root}/test',  transform=eval_tf)


val_size = int(0.2 * len(train_full))
train_size = len(train_full) - val_size
train_ds, val_ds = random_split(train_full, [train_size, val_size])


num_workers = 0
pin_memory = False

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,
                          num_workers=num_workers, pin_memory=pin_memory)
val_loader   = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=num_workers, pin_memory=pin_memory)
test_loader  = DataLoader(test_ds, batch_size=BATCH_SIZE, shuffle=False,
                          num_workers=num_workers, pin_memory=pin_memory)



print(f"Train: {len(train_ds)} | Val: {len(val_ds)} | Test: {len(test_ds)}")

Using device: cpu
Train: 80000 | Val: 20000 | Test: 10000


In [17]:
alex = models.alexnet(weights=True)

print("-" * 50)
print("Arquitectura AlexNet")
print("-" * 50)
print(alex)



--------------------------------------------------
Arquitectura AlexNet
--------------------------------------------------
AlexNet(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2))
    (1): ReLU(inplace=True)
    (2): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (3): Conv2d(64, 192, kernel_size=(5, 5), stride=(1, 1), padding=(2, 2))
    (4): ReLU(inplace=True)
    (5): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
    (6): Conv2d(192, 384, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (7): ReLU(inplace=True)
    (8): Conv2d(384, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (9): ReLU(inplace=True)
    (10): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): MaxPool2d(kernel_size=3, stride=2, padding=0, dilation=1, ceil_mode=False)
  )
  (avgpool): AdaptiveAvgPool2d(output_size=(6, 6))
  (classi

In [18]:
alex.features[0]

Conv2d(3, 64, kernel_size=(11, 11), stride=(4, 4), padding=(2, 2))

In [19]:
alex = models.alexnet(weights=True)

In [None]:

alex.classifier = nn.Sequential(
    torch.nn.Linear(9216, 1024),
    MyBatchNorm1d(1024),
    nn.ReLU(),
    torch.nn.Linear(1024, 1024),
    MyBatchNorm1d(1024),
    nn.ReLU(),
    torch.nn.Linear(1024, 512),
    MyBatchNorm1d(512),
    nn.ReLU(),
    torch.nn.Linear(512, 200),  
    nn.Softmax(dim=1)
)  

In [None]:
loss_fn = nn.CrossEntropyLoss()
learning_rate = 1e-3 
optimizer = optim.Adam(alex.parameters(), lr=learning_rate)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model = alex.to(device)

In [22]:
def get_batch_accuracy(output, y, N):
    zero_tensor = torch.tensor([0]).to(DEVICE)
    pred = torch.gt(output, zero_tensor)
    correct = pred.eq(y.view_as(pred)).sum().item()
    return correct / N

def train(model, check_grad=False):
    total_loss = 0
    total_correct = 0

    model.train()
    for x, y in train_loader:
        x, y = x.to(DEVICE), y.to(DEVICE)
        output = model(x)

        optimizer.zero_grad()
        batch_loss = loss_fn(output, y)
        batch_loss.backward()
        optimizer.step()

        total_loss += batch_loss.item()
        preds = output.argmax(dim=1)
        total_correct += (preds == y).sum().item()

    acc = total_correct / len(train_loader.dataset)
    print(f"Train - Loss: {total_loss:.4f} Accuracy: {acc:.4f}")

def validate(model):
    total_loss = 0
    total_correct = 0

    model.eval()
    with torch.no_grad():
        for x, y in test_loader:
            x, y = x.to(DEVICE), y.to(DEVICE)
            output = model(x)
            total_loss += loss_fn(output, y).item()
            preds = output.argmax(dim=1)
            total_correct += (preds == y).sum().item()

    acc = total_correct / len(test_loader.dataset)
    print(f"Test - Loss: {total_loss:.4f} Accuracy: {acc:.4f}")


In [23]:
epochs = 10

for epoch in range(epochs):
    print('Epoch: {}'.format(epoch))
    train(model, check_grad=False)
    validate(model)

Epoch: 0


KeyboardInterrupt: 