In [8]:
import os, math, copy, time
from pathlib import Path
import torch
from torch import nn, optim
from torch.utils.data import DataLoader
from torchvision import datasets, transforms, models
from collections import Counter
import numpy as np

# Rutas
DATA_DIR = "./FER-2013"  # cambia si hace falta
TRAIN_DIR = os.path.join(DATA_DIR, "train")
TEST_DIR  = os.path.join(DATA_DIR, "test")

# Clases esperadas (orden alfabetico para consistencia)
CLASSES = ["angry","disgust","fear","happy","neutral","sad","surprise"]

# Transforms: convertimos a 48x48, pasamos a 3 canales para usar pesos preentrenados
train_tf = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),
    transforms.Resize((48,48)),
    transforms.RandomHorizontalFlip(p=0.5),
    transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5,0.5,0.5], std=[0.5,0.5,0.5]),
])

test_tf = transforms.Compose([
    transforms.Grayscale(num_output_channels=3),
    transforms.Resize((48,48)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.5,0.5,0.5], std=[0.5,0.5,0.5]),
])

# Datasets
train_ds = datasets.ImageFolder(TRAIN_DIR, transform=train_tf)
test_ds  = datasets.ImageFolder(TEST_DIR,  transform=test_tf)

# Asegura mapping de indices -> clases consistente
# Si tus carpetas tienen otro orden lexicografico, reordenamos a CLASSES
# Creamos un mapeo si faltan o hay typos
idx_to_class = {v:k for k,v in train_ds.class_to_idx.items()}
found = sorted(idx_to_class.values())
print("Folders found in train:", found)

# DataLoaders (ajusta num_workers segun tu Colab; 2-4 suele ir bien)
BATCH_SIZE = 128
train_dl = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True,  num_workers=2, pin_memory=True)
test_dl  = DataLoader(test_ds,  batch_size=BATCH_SIZE, shuffle=False, num_workers=2, pin_memory=True)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Device:", device)

# Calcular class weights para CrossEntropy (por desbalanceo)
def compute_class_weights(dataset, num_classes):
    counts = Counter([y for _,y in dataset.samples])
    # dataset.samples guarda (path, class_idx); contamos por class_idx
    freqs = np.array([counts[i] for i in range(num_classes)], dtype=np.float32)
    weights = freqs.sum() / (freqs * num_classes)
    return torch.tensor(weights, dtype=torch.float32)

num_classes = len(train_ds.classes)
class_weights = compute_class_weights(train_ds, num_classes).to(device)
print("Class weights:", class_weights.cpu().numpy())

# Modelo: ResNet18 preentrenada
model = models.resnet18(pretrained=True)
# Reemplazamos la ultima capa para 7 clases
in_feats = model.fc.in_features
model.fc = nn.Linear(in_feats, num_classes)
model = model.to(device)

# Optimizador y loss
lr = 1e-3
optimizer = optim.AdamW(model.parameters(), lr=lr, weight_decay=1e-4)
criterion = nn.CrossEntropyLoss(weight=class_weights)

# One-cycle scheduler (opcional pero ayuda)
steps_per_epoch = math.ceil(len(train_dl))
scheduler = optim.lr_scheduler.OneCycleLR(optimizer, max_lr=3e-3,
                                          steps_per_epoch=steps_per_epoch,
                                          epochs=20)

# Entrenamiento con early stopping
EPOCHS = 20
patience = 5
best_acc = 0.0
best_state = None
no_improve = 0

def evaluate(dataloader):
    model.eval()
    correct, total, loss_sum = 0, 0, 0.0
    with torch.no_grad():
        for x, y in dataloader:
            x, y = x.to(device, non_blocking=True), y.to(device, non_blocking=True)
            logits = model(x)
            loss = criterion(logits, y)
            preds = logits.argmax(1)
            correct += (preds == y).sum().item()
            total += y.size(0)
            loss_sum += loss.item() * y.size(0)
    return loss_sum/total, correct/total

for epoch in range(1, EPOCHS+1):
    model.train()
    running_loss = 0.0
    start = time.time()
    for x, y in train_dl:
        x, y = x.to(device, non_blocking=True), y.to(device, non_blocking=True)
        optimizer.zero_grad(set_to_none=True)
        logits = model(x)
        loss = criterion(logits, y)
        loss.backward()
        optimizer.step()
        if scheduler is not None:
            scheduler.step()
        running_loss += loss.item() * y.size(0)

    train_loss = running_loss / len(train_ds)
    val_loss, val_acc = evaluate(test_dl)
    dur = time.time() - start
    print(f"[{epoch:02d}] train_loss={train_loss:.4f} val_loss={val_loss:.4f} val_acc={val_acc:.4f} time={dur:.1f}s")

    if val_acc > best_acc:
        best_acc = val_acc
        best_state = copy.deepcopy(model.state_dict())
        no_improve = 0
        torch.save(best_state, "best_fer2013_resnet18.pt")
    else:
        no_improve += 1
        if no_improve >= patience:
            print("Early stopping.")
            break

# Cargar el mejor
if best_state is not None:
    model.load_state_dict(best_state)

print("Best val_acc:", best_acc)

# Guardar los pesos del modelo
torch.save(model.state_dict(), "best_fer2013_resnet18.pt")


Folders found in train: ['angry', 'disgust', 'fear', 'happy', 'neutral', 'sad', 'surprise']
Device: cuda
Class weights: [1.0266047  9.406619   1.0010461  0.56843877 0.82603943 0.8491275
 1.293373  ]
[01] train_loss=1.7078 val_loss=1.4166 val_acc=0.4745 time=176.7s
[02] train_loss=1.3820 val_loss=1.2900 val_acc=0.5258 time=19.2s
[03] train_loss=1.3313 val_loss=1.2596 val_acc=0.5320 time=19.5s
[04] train_loss=1.3360 val_loss=1.2614 val_acc=0.5064 time=21.8s
[05] train_loss=1.2744 val_loss=1.4103 val_acc=0.4714 time=25.2s
[06] train_loss=1.2720 val_loss=1.2540 val_acc=0.5213 time=22.6s
[07] train_loss=1.2244 val_loss=1.2175 val_acc=0.5343 time=21.8s
[08] train_loss=1.1982 val_loss=1.3475 val_acc=0.5059 time=21.1s
[09] train_loss=1.1578 val_loss=1.8018 val_acc=0.4262 time=21.1s
[10] train_loss=1.1553 val_loss=1.1791 val_acc=0.5598 time=21.2s
[11] train_loss=1.0235 val_loss=1.1146 val_acc=0.5857 time=21.4s
[12] train_loss=0.9611 val_loss=1.0870 val_acc=0.6031 time=22.1s
[13] train_loss=0.92

In [5]:
import cv2
import time
import torch
import numpy as np
from torchvision import transforms, models
from torch import nn

# ===================== CONFIG =====================
# Pon el nombre de tu fichero de pesos:
WEIGHTS_PATH = "best_fer2013_resnet18.pt"

# Lista de clases (en el mismo orden que usaste al entrenar)
CLASSES = ["angry","disgust","fear","happy","neutral","sad","surprise"]

# Selecciona el preprocesado que COINCIDE con tu entrenamiento:
# - Si entrenaste con tu script original: USE_IMAGENET_PREPROCESS = False
# - Si entrenaste con mi patch (224 y mean/std ImageNet): USE_IMAGENET_PREPROCESS = True
USE_IMAGENET_PREPROCESS = False

# (Opcional) Usar detector de caras para recortar y clasificar solo la cara
USE_FACE_DETECTOR = True

# Umbral de confianza para mostrar etiqueta (0..1). Si es None, muestra siempre la top-1
CONFIDENCE_THRESHOLD = 0.0

# Dispositivo
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# ==================================================

# --------- Transforms coherentes con el entrenamiento ---------
if USE_IMAGENET_PREPROCESS:
    # Patch sugerido: 224x224 + ImageNet mean/std
    imagenet_mean = [0.485, 0.456, 0.406]
    imagenet_std  = [0.229, 0.224, 0.225]
    IMG_SIZE = (224, 224)
    preprocess = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=imagenet_mean, std=imagenet_std),
    ])
else:
    # Tu código original: 48x48 + mean/std = 0.5
    IMG_SIZE = (48, 48)
    preprocess = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize(mean=[0.5,0.5,0.5], std=[0.5,0.5,0.5]),
    ])

# --------- Modelo (ResNet18 con última capa a 7 clases) ---------
def build_model(num_classes=7):
    model = models.resnet18(pretrained=False)  # pretrained=True si quieres cargar ImageNet
    in_feats = model.fc.in_features
    model.fc = nn.Linear(in_feats, num_classes)
    return model

model = build_model(num_classes=len(CLASSES)).to(DEVICE)
state = torch.load(WEIGHTS_PATH, map_location=DEVICE)
model.load_state_dict(state)
model.eval()

# --------- Detector de caras (opcional) ---------
face_detector = None
if USE_FACE_DETECTOR:
    # Haar Cascade que viene con OpenCV
    cascade_path = cv2.data.haarcascades + "haarcascade_frontalface_default.xml"
    face_detector = cv2.CascadeClassifier(cascade_path)

# --------- Utilidades ---------
def predict_emotion_bgr(face_bgr):
    with torch.no_grad():
        # 1) BGR -> GRAY
        gray = cv2.cvtColor(face_bgr, cv2.COLOR_BGR2GRAY)
        # 2) GRAY -> RGB
        rgb = cv2.cvtColor(gray, cv2.COLOR_GRAY2RGB)
        # 3) Resize
        rgb = cv2.resize(rgb, IMG_SIZE, interpolation=cv2.INTER_LINEAR)
        # 4) To tensor + normalize
        x = preprocess(rgb).unsqueeze(0).to(DEVICE)

        logits = model(x)
        probs = torch.softmax(logits, dim=1)[0]
        prob, idx = torch.max(probs, dim=0)
        return CLASSES[idx.item()], float(prob.item()), probs.detach().cpu().numpy()

def draw_label(img, text, x, y):
    """Dibuja caja de texto con fondo."""
    font = cv2.FONT_HERSHEY_SIMPLEX
    scale, thickness = 0.7, 2
    (tw, th), _ = cv2.getTextSize(text, font, scale, thickness)
    cv2.rectangle(img, (x, y - th - 8), (x + tw + 8, y + 4), (0, 0, 0), -1)
    cv2.putText(img, text, (x + 4, y - 4), font, scale, (255, 255, 255), thickness, cv2.LINE_AA)

def main():
    cap = cv2.VideoCapture(0)
    if not cap.isOpened():
        print("No se pudo abrir la webcam.")
        return

    # FPS
    t0 = time.time()
    frames = 0

    while True:
        ok, frame = cap.read()
        if not ok:
            break
        frames += 1

        display = frame.copy()

        # Detección de rostro (opcional)
        faces = []
        if USE_FACE_DETECTOR and face_detector is not None:
            gray_cam = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
            # Parametros ajustables: scaleFactor y minNeighbors
            rects = face_detector.detectMultiScale(gray_cam, scaleFactor=1.2, minNeighbors=5, minSize=(60, 60))
            for (x, y, w, h) in rects:
                faces.append((x, y, w, h))

        if faces:
            # Clasificar cada rostro detectado
            for (x, y, w, h) in faces:
                roi = frame[y:y+h, x:x+w]
                label, prob, _ = predict_emotion_bgr(roi)
                if (CONFIDENCE_THRESHOLD is None) or (prob >= CONFIDENCE_THRESHOLD):
                    txt = f"{label} ({prob*100:.1f}%)"
                else:
                    txt = "..."

                # Dibujar
                cv2.rectangle(display, (x, y), (x+w, y+h), (0, 255, 0), 2)
                draw_label(display, txt, x, y)
        else:
            # Sin caras: clasificar el frame completo (fallback)
            label, prob, _ = predict_emotion_bgr(frame)
            H, W = frame.shape[:2]
            if (CONFIDENCE_THRESHOLD is None) or (prob >= CONFIDENCE_THRESHOLD):
                txt = f"{label} ({prob*100:.1f}%)"
            else:
                txt = "..."
            # Caja centrada solo para referencia visual
            box_w, box_h = int(0.4*W), int(0.4*H)
            cx, cy = W//2, H//2
            x1, y1 = cx - box_w//2, cy - box_h//2
            x2, y2 = cx + box_w//2, cy + box_h//2
            cv2.rectangle(display, (x1, y1), (x2, y2), (255, 200, 0), 2)
            draw_label(display, txt, x1, y1)

        # FPS
        if frames % 10 == 0:
            now = time.time()
            fps = 10.0 / (now - t0)
            t0 = now
        # Evita recalcular si no ha tocado
        fps_text = f"FPS: {int(cap.get(cv2.CAP_PROP_FPS))}" if cap.get(cv2.CAP_PROP_FPS) > 0 else ""

        # Mostrar
        if fps_text:
            draw_label(display, fps_text, 10, 30)
        cv2.imshow("FER2013 - Webcam Emotion", display)

        key = cv2.waitKey(1) & 0xFF
        if key == ord('q') or key == 27:  # 'q' o ESC para salir
            break

    cap.release()
    cv2.destroyAllWindows()

if __name__ == "__main__":
    main()