
# DCGAN + SqueezeNet (Discriminador) con FashionMNIST

Este cuadernillo implementa una **DCGAN** donde:
- El **Generador** es una red deconvolucional clásica (salida 1×64×64 en `[-1, 1]`).
- El **Discriminador** usa **SqueezeNet** *preentrenada en ImageNet* como *backbone*.
  - Repetimos el canal (de 1 a 3) y **redimensionamos** a 224×224 para que
    encaje con SqueezeNet.
  - Reemplazamos la cabeza de clasificación por un único *logit* (real/falso).
  - Opción de congelar características y afinar sólo la cabeza, o
    *descongelar* gradualmente capas superiores.

**Dataset:** `FashionMNIST` (28×28 gris). Si el *download* falla, el código puede alternar a `KMNIST` automáticamente.

> **Nota:** Usar un backbone grande (SqueezeNet) para discriminar imágenes simples (64×64, 1 canal) es pedagógico y *funciona*, pero es más pesado que un discriminador DCGAN clásico. Gana robustez a texturas/patrones gracias al preentrenamiento.


In [None]:

# !pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu121
# ↑ Descomenta si estás en un entorno sin PyTorch (ajusta CUDA/CPU según tu caso).

import os, math, time, random
from pathlib import Path

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, utils as vutils, models

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# ------------------------
# Configuración principal
# ------------------------
cfg = {
    "data_root": "./data",
    "batch_size": 128,
    "num_workers": 2,
    "image_size_g": 64,      # tamaño del Generador (1x64x64)
    "nz": 128,               # dimensión del vector latente z
    "ngf": 64,               # canales base del Generador
    "lr_g": 2e-4,
    "lr_d": 1e-4,            # tip: D ligeramente más lento al usar backbone grande
    "beta1": 0.5,
    "beta2": 0.999,
    "epochs": 20,
    "save_every": 5,
    "samples_dir": "./samples_dcgan_sqz",
    "chkpt_dir": "./checkpoints_dcgan_sqz",
    "freeze_squeezenet": True,    # comienza congelando el backbone
    "unfreeze_after": 10,         # descongela a partir de esta época (None para nunca)
    "use_amp": True               # entrenamiento mixto (si hay CUDA)
}

Path(cfg["samples_dir"]).mkdir(parents=True, exist_ok=True)
Path(cfg["chkpt_dir"]).mkdir(parents=True, exist_ok=True)

g_seed = 1337
random.seed(g_seed)
torch.manual_seed(g_seed)
if device.type == "cuda":
    torch.cuda.manual_seed_all(g_seed)


In [None]:

# ------------------------
# Dataset: FashionMNIST
# ------------------------
# El generador produce 1x64x64, así que escalamos reales a 64 y normalizamos a [-1, 1].
# Para el discriminador (SqueezeNet) convertiremos en vuelo a 3x224x224.

transform_64 = transforms.Compose([
    transforms.Resize(cfg["image_size_g"]),
    transforms.CenterCrop(cfg["image_size_g"]),
    transforms.ToTensor(),
    transforms.Normalize([0.5], [0.5])  # -> [-1, 1]
])

def get_dataset():
    tried = []
    try:
        ds = datasets.FashionMNIST(cfg["data_root"], train=True, download=True, transform=transform_64)
        name = "FashionMNIST"
        return ds, name
    except Exception as e:
        tried.append(("FashionMNIST", str(e)))
        try:
            ds = datasets.KMNIST(cfg["data_root"], train=True, download=True, transform=transform_64)
            name = "KMNIST"
            return ds, name
        except Exception as e2:
            tried.append(("KMNIST", str(e2)))
            raise RuntimeError(f"No se pudo descargar ninguno de los datasets: {tried}")

train_ds, ds_name = get_dataset()
print("Dataset:", ds_name, "Tamaño:", len(train_ds))

train_loader = DataLoader(train_ds, batch_size=cfg["batch_size"], shuffle=True,
                          num_workers=cfg["num_workers"], pin_memory=(device.type=="cuda"))


In [None]:

# ------------------------
# Modelo: Generador DCGAN (salida 1x64x64)
# ------------------------
class Generator(nn.Module):
    def __init__(self, nz=128, ngf=64, out_ch=1):
        super().__init__()
        self.net = nn.Sequential(
            # nz x 1 x 1  -> (ngf*8) x 4 x 4
            nn.ConvTranspose2d(nz, ngf*8, 4, 1, 0, bias=False),
            nn.BatchNorm2d(ngf*8),
            nn.ReLU(True),

            # (ngf*8) x 4 x 4 -> (ngf*4) x 8 x 8
            nn.ConvTranspose2d(ngf*8, ngf*4, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf*4),
            nn.ReLU(True),

            # -> (ngf*2) x 16 x 16
            nn.ConvTranspose2d(ngf*4, ngf*2, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf*2),
            nn.ReLU(True),

            # -> (ngf) x 32 x 32
            nn.ConvTranspose2d(ngf*2, ngf, 4, 2, 1, bias=False),
            nn.BatchNorm2d(ngf),
            nn.ReLU(True),

            # -> out_ch x 64 x 64
            nn.ConvTranspose2d(ngf, out_ch, 4, 2, 1, bias=False),
            nn.Tanh()
        )

    def forward(self, z):
        return self.net(z)

G = Generator(cfg["nz"], cfg["ngf"], out_ch=1).to(device)
print(G)


In [None]:

# ------------------------
# Discriminador con SqueezeNet preentrenada
# ------------------------
# - Repite canal 1->3 y reescala a 224x224 en el forward.
# - Normaliza con medias/devs de ImageNet (esperado por SqueezeNet).
# - Reemplaza la cabeza por un logit.
class DiscriminatorSqueezeNet(nn.Module):
    def __init__(self, freeze_backbone=True):
        super().__init__()
        sqz = models.squeezenet1_1(weights=models.SqueezeNet1_1_Weights.IMAGENET1K_V1)
        self.features = sqz.features  # extractor
        # reemplazar clasificador por un conv 1x1 a 1 canal (logit)
        self.classifier = nn.Sequential(
            nn.Dropout(p=0.5),
            nn.Conv2d(512, 1, kernel_size=1),
            nn.Flatten(),
        )
        # congelar backbone si se desea
        for p in self.features.parameters():
            p.requires_grad = not freeze_backbone

        # Imagenet stats
        self.register_buffer("mean", torch.tensor([0.485, 0.456, 0.406]).view(1,3,1,1))
        self.register_buffer("std",  torch.tensor([0.229, 0.224, 0.225]).view(1,3,1,1))

    def preprocess_to_224(self, x_1x64x64):
        # x: (B,1,64,64) en [-1,1]
        x = (x_1x64x64 + 1)/2.0  # -> [0,1]
        x = x.repeat(1,3,1,1)    # 1->3 canales
        x = F.interpolate(x, size=(224,224), mode="bilinear", align_corners=False)
        # normalizar para SqueezeNet
        x = (x - self.mean) / self.std
        return x

    def forward(self, x):
        x = self.preprocess_to_224(x)
        feat = self.features(x)           # (B,512,13,13)
        out = self.classifier(feat)       # (B, 1*13*13) -> tras Flatten
        # Global average pool implícito en SqueezeNet original, aquí usamos conv+flatten
        # Opcional: hacer media espacial manual para dejar un solo logit por imagen:
        out = out.mean(dim=1, keepdim=True)  # (B,1)
        return out

D = DiscriminatorSqueezeNet(freeze_backbone=cfg["freeze_squeezenet"]).to(device)
print("Parametros D (entrenables):", sum(p.numel() for p in D.parameters() if p.requires_grad))


In [None]:

criterion = nn.BCEWithLogitsLoss()

optG = optim.Adam(G.parameters(), lr=cfg["lr_g"], betas=(cfg["beta1"], cfg["beta2"]))
optD = optim.Adam(filter(lambda p: p.requires_grad, D.parameters()),
                  lr=cfg["lr_d"], betas=(cfg["beta1"], cfg["beta2"]))

scaler = torch.cuda.amp.GradScaler(enabled=(cfg["use_amp"] and device.type=="cuda"))

def sample_fixed_grid(G, nz=128, nrow=8, fname="sample.png"):
    G.eval()
    with torch.no_grad():
        z = torch.randn(nrow*nrow, nz, 1, 1, device=device)
        fakes = G(z).cpu()  # en [-1,1]
        vutils.save_image((fakes+1)/2, fname, nrow=nrow)
    G.train()

def save_ckpt(epoch):
    torch.save({
        "G": G.state_dict(),
        "D": D.state_dict(),
        "optG": optG.state_dict(),
        "optD": optD.state_dict(),
        "epoch": epoch,
        "cfg": cfg
    }, os.path.join(cfg["chkpt_dir"], f"dcgan_sqz_ep{epoch}.pt"))


In [None]:

real_label = 1.0
fake_label = 0.0

fixed_out = os.path.join(cfg["samples_dir"], "fixed_grid_ep0.png")
sample_fixed_grid(G, cfg["nz"], nrow=8, fname=fixed_out)
print("Muestra inicial guardada en:", fixed_out)

start_time = time.time()
for epoch in range(1, cfg["epochs"]+1):
    if cfg["unfreeze_after"] is not None and epoch == cfg["unfreeze_after"]:
        # Descongelar backbone para *fine-tuning suave*
        for p in D.features.parameters():
            p.requires_grad = True
        optD = optim.Adam(D.parameters(), lr=cfg["lr_d"]*0.5, betas=(cfg["beta1"], cfg["beta2"]))
        print(f"[Epoch {epoch}] Descongelado parcial del backbone y lr_D reducido")

    for i, (imgs, _) in enumerate(train_loader):
        bsz = imgs.size(0)
        real = imgs.to(device, non_blocking=True)

        # --------------------
        # (1) Actualiza D
        # --------------------
        D.zero_grad(set_to_none=True)

        with torch.cuda.amp.autocast(enabled=(cfg["use_amp"] and device.type=="cuda")):
            # Reales
            logits_real = D(real)
            labels_real = torch.full((bsz,1), real_label, device=device)
            loss_D_real = criterion(logits_real, labels_real)

            # Falsas
            z = torch.randn(bsz, cfg["nz"], 1, 1, device=device)
            fake = G(z).detach()
            logits_fake = D(fake)
            labels_fake = torch.full((bsz,1), fake_label, device=device)
            loss_D_fake = criterion(logits_fake, labels_fake)

            loss_D = loss_D_real + loss_D_fake

        scaler.scale(loss_D).backward()
        scaler.step(optD)
        scaler.update()

        # --------------------
        # (2) Actualiza G
        # --------------------
        G.zero_grad(set_to_none=True)
        with torch.cuda.amp.autocast(enabled=(cfg["use_amp"] and device.type=="cuda")):
            z = torch.randn(bsz, cfg["nz"], 1, 1, device=device)
            fake = G(z)
            logits_fake_g = D(fake)
            labels_real_for_g = torch.full((bsz,1), real_label, device=device)
            loss_G = criterion(logits_fake_g, labels_real_for_g)

        scaler.scale(loss_G).backward()
        scaler.step(optG)
        scaler.update()

        if (i+1) % 100 == 0:
            print(f"[{epoch:03d}/{cfg['epochs']}] step {i+1:04d}/{len(train_loader)} "
                  f"lossD={loss_D.item():.3f} lossG={loss_G.item():.3f}")

    # Muestras y checkpoints por época
    out_path = os.path.join(cfg["samples_dir"], f"fakes_ep{epoch}.png")
    sample_fixed_grid(G, cfg["nz"], nrow=8, fname=out_path)
    print(f"Guardado grid de muestras: {out_path}")

    if epoch % cfg["save_every"] == 0:
        save_ckpt(epoch)

elapsed = (time.time()-start_time)/60
print(f"Entrenamiento terminado en {elapsed:.1f} min.")


In [None]:

# Visualización rápida de un batch real y uno falso
import matplotlib.pyplot as plt

def show_tensor_grid(x, title):
    grid = vutils.make_grid((x[:64].cpu()+1)/2, nrow=8)  # asume [-1,1]
    plt.figure(figsize=(6,6))
    plt.axis("off")
    plt.title(title)
    plt.imshow(grid.permute(1,2,0))

# batch real
real_batch = next(iter(train_loader))[0].to(device)
with torch.no_grad():
    z = torch.randn(64, cfg["nz"], 1, 1, device=device)
    fake_batch = G(z)

show_tensor_grid(real_batch, f"Reales ({ds_name})")
plt.show()

show_tensor_grid(fake_batch, "Falsas (G)")
plt.show()



## Notas y recomendaciones

- **Normalización**: El generador produce `[-1,1]`. Para SqueezeNet convertimos a `[0,1]`, replicamos canal y redimensionamos a `224×224`, luego normalizamos con las estadísticas de ImageNet.
- **Estabilidad**: Al inicio conviene **congelar** el backbone y entrenar sólo la cabeza. Más adelante, podemos **descongelar** capas superiores (`unfreeze_after`) para ganar potencia.
- **Ajustes útiles**:
  - Subir `epochs` a 50–100.
  - Bajar `batch_size` si tu GPU/CPU es limitada.
  - Probar `lr_d` más bajo o usar *label smoothing* ligero para reales (p.ej. `0.9`).
- **Alternativa**: Si prefieres un discriminador 100% DCGAN (ligero), reemplaza este por un CNN con *stride* y *LeakyReLU*. El objetivo de este cuaderno es mostrar cómo **reutilizar** un modelo **preentrenado** como discriminador.
