### Install Packages

In [None]:
# Install required packages
!pip install torch torchvision torchaudio gradio opencv-python datasets transformers scikit-learn

### Import Packages

In [None]:
# Import required packages
import torch
import torchvision.transforms as transforms
import torch.nn as nn
import torch.optim as optim
from torchvision import models
from torchvision.models import ResNet18_Weights
import gradio as gr
import cv2
from datasets import load_dataset
import numpy as np
from torch.utils.data import Dataset, DataLoader
from PIL import Image
import io
from sklearn.metrics import accuracy_score, f1_score
from collections import Counter
from tqdm import tqdm
import copy

### Load Dataset from HuggingFace Model

In [None]:
# Load Dataset from HuggingFace (FER2013)
dataset = load_dataset('Jeneral/fer-2013')

train_ds = dataset['train']
test_ds = dataset['test']
print(f"Train samples: {len(train_ds)} | Test samples: {len(test_ds)}")

### Data Preprocessing

In [None]:
# Data Preprocessing
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class FERDataset(Dataset):
    def __init__(self, hf_dataset, transform=None):
        self.dataset = hf_dataset
        self.transform = transform
    def __len__(self):
        return len(self.dataset)
    def __getitem__(self, idx):
        # Convert 'img_bytes' to PIL image, then to OpenCV for optional processing
        img_bytes = self.dataset[idx]['img_bytes']
        img = Image.open(io.BytesIO(img_bytes)).convert('RGB')
        # OpenCV preprocessing: histogram equalization (enabled)
        img_cv = np.array(img)
        img_cv = cv2.cvtColor(img_cv, cv2.COLOR_RGB2BGR)
        img_yuv = cv2.cvtColor(img_cv, cv2.COLOR_BGR2YUV)
        img_yuv[:,:,0] = cv2.equalizeHist(img_yuv[:,:,0])
        img_eq = cv2.cvtColor(img_yuv, cv2.COLOR_YUV2RGB)
        img = Image.fromarray(img_eq)
        label = self.dataset[idx]['labels']
        if self.transform:
            img = self.transform(img)
        return img, label

# Data augmentation and normalization (32x32, only defined once)
train_transform = transforms.Compose([
    transforms.RandomHorizontalFlip(),
    transforms.RandomRotation(10),  # reduced
    transforms.RandomResizedCrop(32, scale=(0.9, 1.0)),  # less aggressive
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

test_transform = transforms.Compose([
    transforms.Resize((32, 32)),
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
])

train_data = FERDataset(train_ds, transform=train_transform)
test_data = FERDataset(test_ds, transform=test_transform)

train_loader = DataLoader(train_data, batch_size=128, shuffle=True, num_workers=0)
test_loader = DataLoader(test_data, batch_size=128, shuffle=False, num_workers=0)


### Training Model - Definition Part

In [None]:
# Training Model - Definition Part (ResNet18, Adam optimizer, label smoothing, class weighting, early stopping at 80% accuracy, model checkpointing, model saving)
num_classes = len(set(train_ds['labels']))
# Use ResNet18 for 32x32 images
model = models.resnet18(weights=ResNet18_Weights.DEFAULT)
# Adjust first conv layer for 32x32 input
model.conv1 = torch.nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
model.maxpool = torch.nn.Identity()
# Unfreeze all layers for full fine-tuning
for param in model.parameters():
    param.requires_grad = True
# Add dropout to the classifier head
model.fc = nn.Sequential(
    nn.Dropout(0.5),
    nn.Linear(model.fc.in_features, num_classes)
    )
model = model.to(device)

# Compute class weights for imbalanced data
label_counts = Counter([sample['labels'] for sample in train_ds])
total = sum(label_counts.values())
class_weights = [total / label_counts[i] for i in range(num_classes)]
class_weights = torch.tensor(class_weights, dtype=torch.float).to(device)

# Label smoothing loss
class LabelSmoothingCrossEntropy(nn.Module):
    def __init__(self, smoothing=0.1):
        super().__init__()
        self.smoothing = smoothing
    def forward(self, pred, target):
        confidence = 1.0 - self.smoothing
        logprobs = torch.nn.functional.log_softmax(pred, dim=-1)
        nll_loss = -logprobs.gather(dim=-1, index=target.unsqueeze(1)).squeeze(1)
        smooth_loss = -logprobs.mean(dim=-1)
        loss = confidence * nll_loss + self.smoothing * smooth_loss
        return loss.mean()

criterion = LabelSmoothingCrossEntropy(smoothing=0.05)
optimizer = optim.AdamW(model.parameters(), lr=0.0005, weight_decay=1e-4)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='max', factor=0.5, patience=2, min_lr=1e-5)

### Training Model - Execution Part

In [None]:
# Training Model - Execution Part

# Reduce max epochs for faster experimentation
epochs = 20
best_val_acc = 0
best_model_wts = copy.deepcopy(model.state_dict())

# Use torch.amp for mixed precision (PyTorch 2.0+)
scaler = torch.amp.GradScaler() if torch.cuda.is_available() else None

for epoch in range(epochs):
    print(f'\nEpoch {epoch+1}/{epochs}')
    model.train()
    running_loss = 0.0
    all_labels = []
    all_preds = []
    total_batches = len(train_loader)
    for batch_idx, (images, labels) in enumerate(tqdm(train_loader, desc=f'Training Epoch {epoch+1}', total=total_batches)):
        images, labels = images.to(device, non_blocking=True), labels.to(device, non_blocking=True)
        optimizer.zero_grad()
        if scaler:
            with torch.amp.autocast(device_type='cuda' if torch.cuda.is_available() else 'cpu'):
                outputs = model(images)
                loss = criterion(outputs, labels)
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()
        else:
            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
        running_loss += loss.item()
        preds = torch.argmax(outputs, 1).detach().cpu().numpy()
        all_preds.extend(preds)
        all_labels.extend(labels.detach().cpu().numpy())
    acc = accuracy_score(all_labels, all_preds)
    f1 = f1_score(all_labels, all_preds, average='weighted')
    print(f"Epoch {epoch+1}, Loss: {running_loss/(batch_idx+1):.4f}, Accuracy: {acc:.4f}, F1: {f1:.4f}")

    # Validation on a larger test subset
    model.eval()
    val_labels = []
    val_preds = []
    with torch.no_grad():
        for i, (images, labels) in enumerate(test_loader):
            if i >= 10:
                break
            images, labels = images.to(device, non_blocking=True), labels.to(device, non_blocking=True)
            if scaler:
                with torch.amp.autocast(device_type='cuda' if torch.cuda.is_available() else 'cpu'):
                    outputs = model(images)
            else:
                outputs = model(images)
            preds = torch.argmax(outputs, 1).detach().cpu().numpy()
            val_preds.extend(preds)
            val_labels.extend(labels.detach().cpu().numpy())
    val_acc = accuracy_score(val_labels, val_preds)
    val_f1 = f1_score(val_labels, val_preds, average='weighted')
    print(f"Validation Accuracy: {val_acc:.4f}, Validation F1: {val_f1:.4f}")

    # Step scheduler based on validation accuracy
    scheduler.step(val_acc)

    # Save best model based on validation accuracy
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_model_wts = copy.deepcopy(model.state_dict())

    # Early stopping if both train and validation accuracy >= 0.80
    if acc >= 0.80 and val_acc >= 0.80:
        print(f"Early stopping: Train and Validation accuracy >= 80% at epoch {epoch+1}.")
        break

# Load best model weights after training
model.load_state_dict(best_model_wts)
print(f"Best Validation Accuracy Achieved: {best_val_acc:.4f}")

torch.save(model.state_dict(), 'best_fer32_model.pth')
print('Best model saved as best_fer32_model.pth')


### Gradio Implementation

In [None]:
# Gradio Interface Part
def load_model(weights_path='best_fer32_model.pth'):
    model = models.resnet18(weights=None)
    model.conv1 = torch.nn.Conv2d(3, 64, kernel_size=3, stride=1, padding=1, bias=False)
    model.maxpool = torch.nn.Identity()
    num_classes = len(train_ds.features['labels'].names)
    model.fc = nn.Sequential(
        nn.Dropout(0.5),
        nn.Linear(model.fc.in_features, num_classes)
    )
    model.load_state_dict(torch.load(weights_path, map_location=device))
    model = model.to(device)
    model.eval()
    return model

model_infer = load_model()

def predict(img):
    with torch.no_grad():
        img = Image.fromarray(img).convert('RGB')
        img_t = test_transform(img).unsqueeze(0).to(device)
        output = model_infer(img_t)
        probs = torch.softmax(output, dim=1).cpu().numpy()[0]
        pred = int(np.argmax(probs))
        confidence = float(probs[pred]) * 100
        label_map = train_ds.features['labels'].names
        return {
            'Expression': label_map[pred],
            'Confidence': f'{confidence:.2f}%'
        }

iface = gr.Interface(
    fn=predict,
    inputs=gr.Image(type="numpy"),
    outputs=gr.JSON(),
    title="Facial Expression Recognition (Transfer Learning)",
    description="Upload a face image. The model will predict the expression and show the confidence.")
iface.launch()