In [1]:
# ---------------------------
# Abeni's Real-Time Emotion Detection Model made with PyTorch, Mediapipe, ResNet-18 and OpenCV
# (Without CLAHE and Adaptive Thresholding)
#---------------------------

# =============================================
# 1. All Imports
# =============================================
import os
import numpy as np
import pandas as pd
import cv2
import mediapipe as mp
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.models import resnet18, ResNet18_Weights

# =============================================
# 2. Dataset Definition
# =============================================
class FER2013Dataset(Dataset):
    def __init__(self, csv_file, usage="Training", transform=None):
        self.data = pd.read_csv(csv_file)
        self.data = self.data[self.data["Usage"] == usage].reset_index(drop=True)
        self.transform = transform

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        row    = self.data.iloc[idx]
        pixels = np.fromstring(row["pixels"], dtype=int, sep=' ')
        img    = pixels.reshape(48,48).astype(np.uint8)
        img    = cv2.cvtColor(img, cv2.COLOR_GRAY2RGB)
        if self.transform:
            img = self.transform(img)
        label = int(row["emotion"])
        return img, label

# =============================================
# 3. Data Transforms
# =============================================
imagenet_mean = [0.485, 0.456, 0.406]
imagenet_std  = [0.229, 0.224, 0.225]

train_tfm = transforms.Compose([
    transforms.ToPILImage(),
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(15),
    transforms.ColorJitter(brightness=0.2, contrast=0.2),
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=imagenet_mean, std=imagenet_std)
])

val_tfm = transforms.Compose([
    transforms.ToPILImage(),
    transforms.Resize((224,224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=imagenet_mean, std=imagenet_std)
])

# =============================================
# 4. Data Loaders
# =============================================
train_ds  = FER2013Dataset('fer2013.csv', usage='Training',   transform=train_tfm)
val_ds    = FER2013Dataset('fer2013.csv', usage='PublicTest', transform=val_tfm)
test_ds   = FER2013Dataset('fer2013.csv', usage='PrivateTest',transform=val_tfm)

train_loader = DataLoader(train_ds,  batch_size=32, shuffle=True,  num_workers=0)
val_loader   = DataLoader(val_ds,    batch_size=32, shuffle=False, num_workers=0)
test_loader  = DataLoader(test_ds,   batch_size=32, shuffle=False, num_workers=0)

# =============================================
# 5. Model Definition (unfreezing layers 1 to 4 + fc)
# =============================================
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model  = resnet18(weights=ResNet18_Weights.IMAGENET1K_V1)

# Unfreeze layer1, layer2, layer3, layer4, and fc
for name, param in model.named_parameters():
    if not (
        name.startswith('layer1') or
        name.startswith('layer2') or
        name.startswith('layer3') or
        name.startswith('layer4') or
        name.startswith('fc')
    ):
        param.requires_grad = False

# Replace final fc (1000 -> 7)
num_ftrs = model.fc.in_features
model.fc = nn.Linear(num_ftrs, 7)
model    = model.to(device)

# Print trainable parameters
print("Trainable parameters (should list layer1–4 and fc):")
for name, param in model.named_parameters():
    if param.requires_grad:
        print(f"  {name}")

# =============================================
# 6. Training Setup (separate LRs + scheduler)
# =============================================
criterion = nn.CrossEntropyLoss()

params_to_update = [
    {'params': model.layer1.parameters(), 'lr': 1e-5},
    {'params': model.layer2.parameters(), 'lr': 1e-5},
    {'params': model.layer3.parameters(), 'lr': 1e-5},
    {'params': model.layer4.parameters(), 'lr': 1e-5},
    {'params': model.fc.parameters(),     'lr': 1e-4}
]
optimizer = optim.Adam(params_to_update)
scheduler = optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

# =============================================
# 7. Training Loop (fine‐tuning all of layers 1 to 4 + fc)
# =============================================
def train_full_resnet(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs=2):
    best_acc = 0.0
    for epoch in range(1, epochs+1):
        model.train()
        running_loss, running_corr = 0.0, 0

        # Training phase
        for inputs, labels in train_loader:
            inputs, labels = inputs.to(device), labels.to(device)
            optimizer.zero_grad()
            outputs = model(inputs)
            loss    = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss    += loss.item() * inputs.size(0)
            running_corr    += (outputs.argmax(1) == labels).sum().item()

        epoch_loss = running_loss / len(train_loader.dataset)
        epoch_acc  = running_corr / len(train_loader.dataset)
        scheduler.step()

        # Validation phase
        model.eval()
        val_loss, val_corr = 0.0, 0
        with torch.no_grad():
            for inputs, labels in val_loader:
                inputs, labels = inputs.to(device), labels.to(device)
                outputs = model(inputs)
                loss    = criterion(outputs, labels)
                val_loss += loss.item() * inputs.size(0)
                val_corr += (outputs.argmax(1) == labels).sum().item()

        val_loss /= len(val_loader.dataset)
        val_acc  = val_corr / len(val_loader.dataset)

        print(
            f"Epoch {epoch}/{epochs}  "
            f"Train L: {epoch_loss:.4f}, Acc: {epoch_acc:.4f}  |  "
            f"Val L: {val_loss:.4f}, Acc: {val_acc:.4f}"
        )

        # Save best checkpoint
        if val_acc > best_acc:
            best_acc = val_acc
            torch.save(model.state_dict(), 'full_resnet_emotion.pth')

    print(f"Best Val Acc: {best_acc:.4f} (weights saved to full_resnet_emotion.pth)")

# =============================================
# 8. Run Training & Save Weights
# =============================================
# Remove any outdated checkpoints so we retrain from scratch
for old_fn in ['only_fc_emotion_resnet.pth', 'layer2_to_fc_emotion_resnet.pth', 'full_resnet_emotion.pth']:
    if os.path.exists(old_fn):
        os.remove(old_fn)

checkpoint_path = 'full_resnet_emotion.pth'
if not os.path.exists(checkpoint_path):
    train_full_resnet(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs=2)
else:
    print(f"Found checkpoint '{checkpoint_path}', loading weights...")
    model.load_state_dict(torch.load(checkpoint_path, map_location=device))

model.eval()

# =============================================
# 9. Real-Time Inference 
# =============================================
mp_face        = mp.solutions.face_detection
face_detection = mp_face.FaceDetection(
    model_selection=1,
    min_detection_confidence=0.6
)
emotion_map = {
    0: 'Angry',
    1: 'Disgust',
    2: 'Fear',
    3: 'Happy',
    4: 'Sad',
    5: 'Surprise',
    6: 'Neutral'
}
infer_tfm = val_tfm  # deterministic resize + normalize

import torch.nn.functional as F

# --- OPTIONAL STATIC SANITY CHECK THAT I WANTED TO TRY ON THE FER-2013 DATASET ---

print("\n=== Running static FER-2013 test batch check ===")
model.eval()
imgs, labels = next(iter(test_loader))  # grab one batch from PrivateTest or PublicTest
imgs, labels = imgs.to(device), labels.to(device)

with torch.no_grad():
    logits = model(imgs)
    probs  = F.softmax(logits, dim=1)
    preds  = probs.argmax(dim=1)

# Print out first 5 examples
for i in range(5):
    gt_idx   = labels[i].item()
    pred_idx = preds[i].item()
    prob_vec = probs[i].cpu().numpy().round(2)
    print(
        f"Sample {i}: GT={emotion_map[gt_idx]:7s}  "
        f"Pred={emotion_map[pred_idx]:7s}  "
        f"Probs={prob_vec}"
    )

acc = (preds == labels).sum().item() / labels.size(0)
print(f"Static‐batch accuracy on FER-2013 test batch: {acc*100:.2f}%\n")

Trainable parameters (should list layer1–4 and fc):
  layer1.0.conv1.weight
  layer1.0.bn1.weight
  layer1.0.bn1.bias
  layer1.0.conv2.weight
  layer1.0.bn2.weight
  layer1.0.bn2.bias
  layer1.1.conv1.weight
  layer1.1.bn1.weight
  layer1.1.bn1.bias
  layer1.1.conv2.weight
  layer1.1.bn2.weight
  layer1.1.bn2.bias
  layer2.0.conv1.weight
  layer2.0.bn1.weight
  layer2.0.bn1.bias
  layer2.0.conv2.weight
  layer2.0.bn2.weight
  layer2.0.bn2.bias
  layer2.0.downsample.0.weight
  layer2.0.downsample.1.weight
  layer2.0.downsample.1.bias
  layer2.1.conv1.weight
  layer2.1.bn1.weight
  layer2.1.bn1.bias
  layer2.1.conv2.weight
  layer2.1.bn2.weight
  layer2.1.bn2.bias
  layer3.0.conv1.weight
  layer3.0.bn1.weight
  layer3.0.bn1.bias
  layer3.0.conv2.weight
  layer3.0.bn2.weight
  layer3.0.bn2.bias
  layer3.0.downsample.0.weight
  layer3.0.downsample.1.weight
  layer3.0.downsample.1.bias
  layer3.1.conv1.weight
  layer3.1.bn1.weight
  layer3.1.bn1.bias
  layer3.1.conv2.weight
  layer3.1.bn2.w

I0000 00:00:1748891904.338700 19963585 gl_context.cc:357] GL version: 2.1 (2.1 Metal - 88), renderer: Apple M1
INFO: Created TensorFlow Lite XNNPACK delegate for CPU.
W0000 00:00:1748891904.377072 20092428 inference_feedback_manager.cc:114] Feedback manager requires a model with a single signature inference. Disabling support for feedback tensors.



=== Running static FER-2013 test batch check ===
Sample 0: GT=Angry    Pred=Fear     Probs=[0.22 0.02 0.34 0.01 0.26 0.03 0.13]
Sample 1: GT=Surprise  Pred=Sad      Probs=[0.06 0.   0.27 0.03 0.5  0.04 0.1 ]
Sample 2: GT=Neutral  Pred=Sad      Probs=[0.08 0.   0.29 0.02 0.43 0.01 0.16]
Sample 3: GT=Sad      Pred=Fear     Probs=[0.21 0.   0.34 0.   0.34 0.05 0.06]
Sample 4: GT=Fear     Pred=Angry    Probs=[0.37 0.02 0.27 0.01 0.27 0.05 0.02]
Static‐batch accuracy on FER-2013 test batch: 46.88%

