In [4]:
import os
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.transforms as transforms
import torchvision.models as models
import torchvision.datasets as datasets
from torch.utils.data import DataLoader, WeightedRandomSampler
import numpy as np
from tqdm import tqdm
from torchvision.models.efficientnet import EfficientNet_B2_Weights

# Paths
data_dir = "../FBMM/test"
train_dir = os.path.join(data_dir, "train")
val_dir = os.path.join(data_dir, "val")
test_dir = os.path.join(data_dir, "test")
model_save_path = "./models/optimized_efficientnet_b2_emotion_model.pth"

# Configuration
batch_size = 32
num_epochs = 5
initial_lr = 1e-3
num_classes = 7
img_height, img_width = 260, 260
seed = 42  # For reproducibility
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Emotion categories
emotion_classes = ["Anger", "Disgust", "Fear", "Happy", "Neutral", "Sad", "Surprise"]

# Data Augmentation & Normalization
weights = EfficientNet_B2_Weights.IMAGENET1K_V1
transform = transforms.Compose([
    transforms.Resize((img_height, img_width)),
    transforms.ToTensor(),
    weights.transforms()
])

# Load Datasets
train_dataset = datasets.ImageFolder(root=train_dir, transform=transform)
val_dataset = datasets.ImageFolder(root=val_dir, transform=transform)
test_dataset = datasets.ImageFolder(root=test_dir, transform=transform)

# Compute Class Weights
def compute_class_weights(dataset, num_classes):
    labels = np.array([label for _, label in dataset.samples])
    class_counts = np.bincount(labels, minlength=num_classes)
    class_weights = 1.0 / (class_counts + 1e-6)
    class_weights /= class_weights.sum()
    return torch.tensor(class_weights, dtype=torch.float32).to(device)

class_weights = compute_class_weights(train_dataset, num_classes)

# Data Loaders with Weighted Sampling
def get_sampler(dataset):
    labels = np.array([label for _, label in dataset.samples])
    class_sample_counts = np.bincount(labels)
    weights = 1.0 / (class_sample_counts[labels] + 1e-6)
    return WeightedRandomSampler(weights, len(weights))

train_loader = DataLoader(train_dataset, batch_size=batch_size, sampler=get_sampler(train_dataset))
val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

# Load Pretrained EfficientNet Model & Fine-Tuning
def load_model(num_classes):
    print("Loading and configuring the model...")
    
    model = models.efficientnet_b2(weights=weights)  # ✅ Using ImageNet weights

    # Freeze all layers initially
    for param in model.parameters():
        param.requires_grad = False

    # Unlock last 20% of convolutional layers + classifier head
    total_layers = len(list(model.features.children()))
    fine_tune_layers = int(total_layers * 0.2)

    for layer in list(model.features.children())[-fine_tune_layers:]:
        for param in layer.parameters():
            param.requires_grad = True

    # Modify classifier head
    model.classifier = nn.Sequential(
        nn.Dropout(0.6),
        nn.Linear(model.classifier[1].in_features, num_classes)
    )

    # Ensure classifier is trainable
    for param in model.classifier.parameters():
        param.requires_grad = True

    return model.to(device)

# Load model
model = load_model(num_classes)

# Loss & Optimizer
criterion = nn.CrossEntropyLoss(weight=class_weights)
optimizer = optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), lr=initial_lr)
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.5, patience=3)

# Training Loop
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs):
    best_val_loss = np.inf
    for epoch in range(1, num_epochs + 1):
        model.train()
        running_loss, correct, total = 0.0, 0, 0

        for images, labels in tqdm(train_loader, desc=f"Epoch {epoch}/{num_epochs}"):
            images, labels = images.to(device), labels.to(device)
            optimizer.zero_grad()

            outputs = model(images)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item() * images.size(0)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

        train_loss = running_loss / total
        train_acc = correct / total

        # Validation
        model.eval()
        val_loss, val_correct, val_total = 0.0, 0, 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(device), labels.to(device)
                outputs = model(images)
                loss = criterion(outputs, labels)
                val_loss += loss.item() * images.size(0)
                _, predicted = torch.max(outputs, 1)
                val_correct += (predicted == labels).sum().item()
                val_total += labels.size(0)

        val_loss /= val_total
        val_acc = val_correct / val_total

        print(f"\nEpoch {epoch}: Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.4f} | Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.4f}")

        # Learning Rate Scheduling
        scheduler.step(val_loss)

        # Save best model
        if val_loss < best_val_loss:
            best_val_loss = val_loss
            torch.save(model.state_dict(), model_save_path)
            print("Model Saved!")

# Train the model
train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, num_epochs)

# Evaluation on Test Set
def evaluate_model(model, test_loader):
    model.eval()
    correct, total = 0, 0
    with torch.no_grad():
        for images, labels in test_loader:
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs, 1)
            correct += (predicted == labels).sum().item()
            total += labels.size(0)

    accuracy = correct / total
    print(f"Test Accuracy: {accuracy:.4f}")

# Load best model for testing
model.load_state_dict(torch.load(model_save_path))
evaluate_model(model, test_loader)


Loading and configuring the model...


Epoch 1/5: 100%|██████████| 5/5 [00:10<00:00,  2.04s/it]



Epoch 1: Train Loss: 1.9379, Train Acc: 0.2643 | Val Loss: 1.9324, Val Acc: 0.2214
Model Saved!


Epoch 2/5: 100%|██████████| 5/5 [00:10<00:00,  2.09s/it]



Epoch 2: Train Loss: 1.7532, Train Acc: 0.4000 | Val Loss: 1.9332, Val Acc: 0.2000


Epoch 3/5: 100%|██████████| 5/5 [00:09<00:00,  1.96s/it]



Epoch 3: Train Loss: 1.5975, Train Acc: 0.4929 | Val Loss: 1.9372, Val Acc: 0.2357


Epoch 4/5: 100%|██████████| 5/5 [00:10<00:00,  2.06s/it]



Epoch 4: Train Loss: 1.4422, Train Acc: 0.5643 | Val Loss: 1.9316, Val Acc: 0.2286
Model Saved!


Epoch 5/5: 100%|██████████| 5/5 [00:09<00:00,  1.99s/it]



Epoch 5: Train Loss: 1.3217, Train Acc: 0.6429 | Val Loss: 1.9087, Val Acc: 0.2500
Model Saved!


  model.load_state_dict(torch.load(model_save_path))


Test Accuracy: 0.2286
