
# MAC05921 — Tarefa 2: MNIST (NN × CNN) com PyTorch

Este notebook executa a exploração pedida: treinar uma **Rede Neural (NN)** e uma **Rede Neural Convolucional (CNN)** no MNIST, 
comparando comportamento de *loss*, overfitting, *early stopping*, e desempenho em validação/teste.
Ele também **gera figuras** e um **CSV de resultados** para facilitar o relatório em LaTeX.

> **Como usar**
> 1. Garanta internet (para baixar o MNIST) e GPU se possível.
> 2. Rode as células em sequência.
> 3. Ao final, as figuras ficam em `figures/` e os resultados em `resultados.csv`.
> 4. Opcional: rode as explorações extras (ruído, desbalanceamento, t‑SNE, Grad‑CAM).



## Setup

In [1]:

# Versões e imports
import sys, os, math, time, random, csv
from pathlib import Path

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader, Subset, random_split
from torchvision import datasets, transforms

import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
from sklearn.manifold import TSNE

DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Torch:", torch.__version__)
print("CUDA available?", torch.cuda.is_available())
print("Device:", DEVICE)

BASE = Path(".")
OUT_FIG = BASE / "figures"
OUT_FIG.mkdir(parents=True, exist_ok=True)


Torch: 1.10.1+cu102
CUDA available? True
Device: cuda



## Configuração básica

- `samples_per_class`: número de exemplos por classe usados para **treino** (controle de orçamento).
- `val_split`: fração do *train* dedicada à **validação**.
- `epochs`: máximo de épocas; *early stopping* pode parar antes.
- `patience`: paciência do *early stopping* (nº de épocas sem melhora na validação).
- `batch_size`, `lr`: como de costume.


In [4]:

# Hiperparâmetros iniciais
seed = 0
rng = np.random.default_rng(seed)
torch.manual_seed(seed)
random.seed(seed)

samples_per_class = 300      # ajuste nas repetições (por ex. 50, 100, 300, 1000...)
val_split = 0.2
epochs = 30
patience = 5
batch_size = 128
lr = 1e-3

# Normalização padrão do MNIST
mean, std = (0.1307,), (0.3081,)

train_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean, std),
])

test_transform = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize(mean, std),
])



## Dados: MNIST + subamostragem por classe

Baixamos o MNIST via `torchvision.datasets.MNIST`. Para o treino, usamos **apenas** `samples_per_class` por classe.
Se quiser explorar **ruído** (sal‑pimenta) e **deslocamentos/escala**, ative o bloco de **Data Augmentation** mais adiante.


In [5]:

# Download (requer internet local/Colab/Kaggle)
DATA_DIR = BASE / "data"
DATA_DIR.mkdir(exist_ok=True, parents=True)

full_train = datasets.MNIST(root=DATA_DIR, train=True, download=True, transform=train_transform)
full_test  = datasets.MNIST(root=DATA_DIR, train=False, download=True, transform=test_transform)

# Subamostragem balanceada para o treino (samples_per_class por classe)
labels = np.array(full_train.targets)
indices_per_class = [np.where(labels == c)[0] for c in range(10)]

chosen_indices = []
for c in range(10):
    idxs = indices_per_class[c]
    rng.shuffle(idxs)
    chosen_indices.extend(idxs[:samples_per_class])
chosen_indices = np.array(chosen_indices)
rng.shuffle(chosen_indices)

subset_train = Subset(full_train, chosen_indices.tolist())

# Split treino/validação
n_total = len(subset_train)
n_val = int(val_split * n_total)
n_train = n_total - n_val
train_set, val_set = random_split(subset_train, [n_train, n_val], generator=torch.Generator().manual_seed(seed))

train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True, num_workers=2, pin_memory=True)
val_loader   = DataLoader(val_set,   batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)
test_loader  = DataLoader(full_test, batch_size=batch_size, shuffle=False, num_workers=2, pin_memory=True)

print(f"Treino: {len(train_set)} | Validação: {len(val_set)} | Teste: {len(full_test)}")


Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 404: Not Found

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-images-idx3-ubyte.gz to data/MNIST/raw/train-images-idx3-ubyte.gz


35.4%IOPub message rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_msg_rate_limit`.

Current values:
ServerApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
ServerApp.rate_limit_window=3.0 (secs)

74.3%IOPub message rate exceeded.
The Jupyter server will temporarily stop sending output
to the client in order to avoid crashing it.
To change this limit, set the config variable
`--ServerApp.iopub_msg_rate_limit`.

Current values:
ServerApp.iopub_msg_rate_limit=1000.0 (msgs/sec)
ServerApp.rate_limit_window=3.0 (secs)

100.0%


Extracting data/MNIST/raw/train-images-idx3-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 404: Not Found

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/train-labels-idx1-ubyte.gz to data/MNIST/raw/train-labels-idx1-ubyte.gz


102.8%


Extracting data/MNIST/raw/train-labels-idx1-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Failed to download (trying next):
HTTP Error 404: Not Found

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-images-idx3-ubyte.gz to data/MNIST/raw/t10k-images-idx3-ubyte.gz


100.0%


Extracting data/MNIST/raw/t10k-images-idx3-ubyte.gz to data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Failed to download (trying next):
HTTP Error 404: Not Found

Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz
Downloading https://ossci-datasets.s3.amazonaws.com/mnist/t10k-labels-idx1-ubyte.gz to data/MNIST/raw/t10k-labels-idx1-ubyte.gz


112.7%

Extracting data/MNIST/raw/t10k-labels-idx1-ubyte.gz to data/MNIST/raw

Treino: 2400 | Validação: 600 | Teste: 10000






## Modelos

Definimos:
- **MLP (NN)** simples: `Flatten -> Linear -> ReLU -> ... -> Linear` 
- **CNN** compacta: conv → ReLU → pool → conv → ReLU → pool → Linear(s)

Também computamos o **número de parâmetros** para manter NN e CNN em ordem de grandeza semelhante.
Ajuste `hidden_mlp`, `channels_cnn`, etc., para aproximar os totais.


In [6]:

def count_params(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

class MLP(nn.Module):
    def __init__(self, hidden=256, num_classes=10):
        super().__init__()
        self.net = nn.Sequential(
            nn.Flatten(),
            nn.Linear(28*28, hidden),
            nn.ReLU(inplace=True),
            nn.Linear(hidden, hidden),
            nn.ReLU(inplace=True),
            nn.Linear(hidden, num_classes)
        )
    def forward(self, x):
        return self.net(x)

class SmallCNN(nn.Module):
    def __init__(self, c1=16, c2=32, fc=128, num_classes=10):
        super().__init__()
        self.conv1 = nn.Conv2d(1, c1, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(c1, c2, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2,2)
        self.fc1 = nn.Linear((28//4)*(28//4)*c2, fc)
        self.fc2 = nn.Linear(fc, num_classes)
    def forward(self, x):
        x = F.relu(self.conv1(x))   # 28x28 -> 28x28
        x = self.pool(x)            # -> 14x14
        x = F.relu(self.conv2(x))   # 14x14 -> 14x14
        x = self.pool(x)            # -> 7x7
        x = torch.flatten(x, 1)
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return x

hidden_mlp = 256
cnn_c1, cnn_c2, cnn_fc = 16, 32, 128

mlp = MLP(hidden=hidden_mlp).to(DEVICE)
cnn = SmallCNN(c1=cnn_c1, c2=cnn_c2, fc=cnn_fc).to(DEVICE)

print("Parâmetros MLP:", count_params(mlp))
print("Parâmetros CNN:", count_params(cnn))


Parâmetros MLP: 269322
Parâmetros CNN: 206922



## Laço de treino, avaliação e *early stopping*

Treinamos com `CrossEntropyLoss`. O *early stopping* monitora a **loss de validação**, salvando o melhor modelo.
Também geramos **curvas de loss/acc**.


In [7]:

class EarlyStopping:
    def __init__(self, patience=5, min_delta=0.0):
        self.patience = patience
        self.min_delta = min_delta
        self.best = float("inf")
        self.count = 0
        self.stop = False

    def step(self, value):
        if value < self.best - self.min_delta:
            self.best = value
            self.count = 0
        else:
            self.count += 1
            if self.count >= self.patience:
                self.stop = True

def train_one_epoch(model, loader, optim, criterion):
    model.train()
    total_loss, correct, total = 0.0, 0, 0
    for x, y in loader:
        x, y = x.to(DEVICE, non_blocking=True), y.to(DEVICE, non_blocking=True)
        optim.zero_grad()
        logits = model(x)
        loss = criterion(logits, y)
        loss.backward()
        optim.step()
        total_loss += float(loss.item()) * y.size(0)
        pred = logits.argmax(1)
        correct += int((pred == y).sum().item())
        total += y.size(0)
    return total_loss/total, correct/total

@torch.no_grad()
def evaluate(model, loader, criterion):
    model.eval()
    total_loss, correct, total = 0.0, 0, 0
    all_y, all_p = [], []
    for x, y in loader:
        x, y = x.to(DEVICE, non_blocking=True), y.to(DEVICE, non_blocking=True)
        logits = model(x)
        loss = criterion(logits, y)
        total_loss += float(loss.item()) * y.size(0)
        pred = logits.argmax(1)
        correct += int((pred == y).sum().item())
        total += y.size(0)
        all_y.append(y.cpu().numpy())
        all_p.append(pred.cpu().numpy())
    return total_loss/total, correct/total, np.concatenate(all_y), np.concatenate(all_p)

def run_training(model, name="model", epochs=30, lr=1e-3, patience=5):
    criterion = nn.CrossEntropyLoss()
    optim = torch.optim.Adam(model.parameters(), lr=lr)
    es = EarlyStopping(patience=patience)
    history = {"epoch": [], "train_loss": [], "train_acc": [], "val_loss": [], "val_acc": []}
    best_state = None

    for ep in range(1, epochs+1):
        tr_loss, tr_acc = train_one_epoch(model, train_loader, optim, criterion)
        va_loss, va_acc, _, _ = evaluate(model, val_loader, criterion)
        history["epoch"].append(ep)
        history["train_loss"].append(tr_loss); history["train_acc"].append(tr_acc)
        history["val_loss"].append(va_loss);   history["val_acc"].append(va_acc)
        print(f"[{name}] epoch {ep:02d} | train_loss={tr_loss:.4f} acc={tr_acc:.4f} | val_loss={va_loss:.4f} acc={va_acc:.4f}")

        if va_loss <= es.best - es.min_delta:
            best_state = {k: v.cpu().clone() for k, v in model.state_dict().items()}
        es.step(va_loss)
        if es.stop:
            print(f"Early stopping em {ep}. Melhor val_loss={es.best:.4f}.")
            break

    # restaura melhor
    if best_state is not None:
        model.load_state_dict(best_state)
    return history

def plot_curves(hist, title, savepath):
    # Um gráfico por figura, sem estilos/cores custom (diretriz)
    fig = plt.figure()
    xs = hist["epoch"]
    plt.plot(xs, hist["train_loss"], label="train_loss")
    plt.plot(xs, hist["val_loss"], label="val_loss")
    plt.xlabel("Época"); plt.ylabel("Loss"); plt.title(title + " — Loss")
    plt.legend()
    fig.savefig(savepath.replace(".png", "_loss.png"), bbox_inches="tight")
    plt.close(fig)

    fig = plt.figure()
    plt.plot(xs, hist["train_acc"], label="train_acc")
    plt.plot(xs, hist["val_acc"], label="val_acc")
    plt.xlabel("Época"); plt.ylabel("Acurácia"); plt.title(title + " — Acurácia")
    plt.legend()
    fig.savefig(savepath.replace(".png", "_acc.png"), bbox_inches="tight")
    plt.close(fig)



## Experimento principal

- Treinar **MLP** e **CNN** no *subset* configurado.
- *Early stopping* por `val_loss`.
- Curvas de **loss/acc** salvas em `figures/`.
- Avaliar no **teste** e gerar **matriz de confusão** + **relatório**.


In [8]:

def confusion_and_report(y_true, y_pred, prefix="mlp"):
    cm = confusion_matrix(y_true, y_pred, labels=list(range(10)))
    fig = plt.figure()
    plt.imshow(cm, interpolation='nearest')
    plt.title(f"Matriz de confusão — {prefix.upper()}")
    plt.xlabel("Predito"); plt.ylabel("Verdadeiro")
    fig.savefig(OUT_FIG / f"cm_{prefix}.png", bbox_inches="tight")
    plt.close(fig)

    print(f"\nRelatório de classificação — {prefix.upper()}:\n")
    print(classification_report(y_true, y_pred, digits=4))

# ----- MLP -----
mlp = MLP(hidden=hidden_mlp).to(DEVICE)
hist_mlp = run_training(mlp, name="MLP", epochs=epochs, lr=lr, patience=patience)
plot_curves(hist_mlp, "MLP", str(OUT_FIG / "mlp_curves.png"))
_, _, y_true_val, y_pred_val = evaluate(mlp, val_loader, nn.CrossEntropyLoss())
_, _, y_true_test, y_pred_test = evaluate(mlp, test_loader, nn.CrossEntropyLoss())
confusion_and_report(y_true_test, y_pred_test, prefix="mlp")

# ----- CNN -----
cnn = SmallCNN(c1=cnn_c1, c2=cnn_c2, fc=cnn_fc).to(DEVICE)
hist_cnn = run_training(cnn, name="CNN", epochs=epochs, lr=lr, patience=patience)
plot_curves(hist_cnn, "CNN", str(OUT_FIG / "cnn_curves.png"))
_, _, y_true_val_c, y_pred_val_c = evaluate(cnn, val_loader, nn.CrossEntropyLoss())
_, _, y_true_test_c, y_pred_test_c = evaluate(cnn, test_loader, nn.CrossEntropyLoss())
confusion_and_report(y_true_test_c, y_pred_test_c, prefix="cnn")

# Salva CSV resumido para o relatório
import csv
with open(BASE / "resultados.csv", "w", newline="") as f:
    w = csv.writer(f)
    w.writerow(["modelo","best_val_loss","final_val_acc","test_acc","params"])
    w.writerow(["MLP", min(hist_mlp["val_loss"]), hist_mlp["val_acc"][-1], (y_true_test==y_pred_test).mean(), count_params(mlp)])
    w.writerow(["CNN", min(hist_cnn["val_loss"]), hist_cnn["val_acc"][-1], (y_true_test_c==y_pred_test_c).mean(), count_params(cnn)])
print("Figuras salvas em:", OUT_FIG)
print("Resultados em: resultados.csv")


[MLP] epoch 01 | train_loss=1.2727 acc=0.6596 | val_loss=0.6184 acc=0.7867
[MLP] epoch 02 | train_loss=0.4558 acc=0.8554 | val_loss=0.3915 acc=0.8817
[MLP] epoch 03 | train_loss=0.3105 acc=0.9083 | val_loss=0.3064 acc=0.9100
[MLP] epoch 04 | train_loss=0.2284 acc=0.9358 | val_loss=0.2923 acc=0.9133
[MLP] epoch 05 | train_loss=0.1878 acc=0.9425 | val_loss=0.3004 acc=0.9067
[MLP] epoch 06 | train_loss=0.1455 acc=0.9600 | val_loss=0.2478 acc=0.9267
[MLP] epoch 07 | train_loss=0.1111 acc=0.9738 | val_loss=0.2528 acc=0.9250
[MLP] epoch 08 | train_loss=0.0925 acc=0.9754 | val_loss=0.2448 acc=0.9250
[MLP] epoch 09 | train_loss=0.0679 acc=0.9821 | val_loss=0.2277 acc=0.9283
[MLP] epoch 10 | train_loss=0.0540 acc=0.9879 | val_loss=0.2262 acc=0.9317
[MLP] epoch 11 | train_loss=0.0401 acc=0.9933 | val_loss=0.2723 acc=0.9183
[MLP] epoch 12 | train_loss=0.0320 acc=0.9938 | val_loss=0.2695 acc=0.9250
[MLP] epoch 13 | train_loss=0.0207 acc=0.9975 | val_loss=0.2461 acc=0.9400
[MLP] epoch 14 | train_lo


## Extras (opcionais)

### 1) t‑SNE dos *embeddings*
Projeta *features* (antes da última camada) para 2D com t‑SNE e plota.

### 2) Ruído "sal‑pimenta" e *data augmentation*
Ative a célula para reconstruir *loaders* com uma `Transform` que injeta ruído.

### 3) Desbalanceamento de classes
Mostra como reduzir amostras de algumas classes.

### 4) Grad‑CAM (CNN)
Implementação leve para ver regiões de ativação.


In [9]:

@torch.no_grad()
def extract_features(model, loader, is_cnn):
    model.eval()
    feats, labels = [], []
    for x, y in loader:
        x = x.to(DEVICE)
        if is_cnn:
            # Pega antes da última FC
            x1 = F.relu(model.conv1(x)); x1 = model.pool(x1)
            x2 = F.relu(model.conv2(x1)); x2 = model.pool(x2)
            flat = torch.flatten(x2, 1)
            f = F.relu(model.fc1(flat))
        else:
            flat = torch.flatten(x, 1)
            f = F.relu(model.net[1](flat))  # após primeira FC do MLP
        feats.append(f.cpu().numpy())
        labels.append(y.numpy())
    return np.vstack(feats), np.concatenate(labels)

def tsne_plot(model, is_cnn, prefix="mlp"):
    X, y = extract_features(model, test_loader, is_cnn)
    tsne = TSNE(n_components=2, init="random", perplexity=30, learning_rate="auto", n_iter=1000, verbose=1)
    Z = tsne.fit_transform(X)
    fig = plt.figure()
    for c in range(10):
        m = y==c
        plt.scatter(Z[m,0], Z[m,1], s=5, label=str(c))
    plt.title(f"t-SNE features — {prefix.upper()}")
    plt.legend(markerscale=3, bbox_to_anchor=(1.05,1), loc="upper left")
    fig.savefig(OUT_FIG / f"tsne_{prefix}.png", bbox_inches="tight")
    plt.close(fig)

# Exemplo de uso (rodem se quiserem, pode levar alguns minutos):
# tsne_plot(mlp, is_cnn=False, prefix="mlp")
# tsne_plot(cnn, is_cnn=True,  prefix="cnn")


In [10]:

# Exemplo de ruído sal‑pimenta
class SaltPepper(object):
    def __init__(self, amount=0.05):
        self.amount = amount
    def __call__(self, img):
        x = np.array(img)
        m = rng.random(x.shape) < self.amount
        sp = rng.integers(0, 2, size=x.shape) * 255  # 0 ou 255
        x = x.copy()
        x[m] = sp[m]
        import PIL.Image as Image
        return Image.fromarray(x.astype(np.uint8))

# Para ativar, reconstrua os datasets/loaders (exemplo):
# aug_transform = transforms.Compose([SaltPepper(0.05), transforms.ToTensor(), transforms.Normalize(mean, std)])
# full_train_aug = datasets.MNIST(root=DATA_DIR, train=True, download=True, transform=aug_transform)
# (refaça a lógica de subset/split e loaders a partir de full_train_aug)


In [None]:

# Grad-CAM muito simples para a última conv da CNN
class GradCAM:
    def __init__(self, model):
        self.model = model
        self.gradients = None
        self.activations = None
        self.h1 = model.conv2.register_forward_hook(self._forward_hook)
        self.h2 = model.conv2.register_full_backward_hook(self._backward_hook)
    def _forward_hook(self, module, inp, out):
        self.activations = out.detach()
    def _backward_hook(self, module, grad_input, grad_output):
        self.gradients = grad_output[0].detach()
    def __call__(self, x, class_idx=None):
        self.model.zero_grad(set_to_none=True)
        logits = self.model(x)
        if class_idx is None:
            class_idx = logits.argmax(1)
        loss = logits[range(x.size(0)), class_idx].sum()
        loss.backward()
        grads = self.gradients # [B, C, H, W]
        acts  = self.activations
        weights = grads.mean(dim=(2,3), keepdim=True)
        cam = (weights * acts).sum(dim=1, keepdim=True)
        cam = F.relu(cam)
        # normaliza para [0,1]
        cam -= cam.min(dim=2, keepdim=True)[0].min(dim=3, keepdim=True)[0]
        cam /= (cam.max(dim=2, keepdim=True)[0].max(dim=3, keepdim=True)[0] + 1e-8)
        return cam

# Exemplo de uso (depois de treinar a CNN):
# gradcam = GradCAM(cnn)
# x, y = next(iter(test_loader))
# x = x.to(DEVICE)[:8]
# cams = gradcam(x)  # [B,1,7,7] -> upscale para 28x28 se quiser visualizar por cima
