In [1]:
import os
import time
from typing import Dict

import pandas as pd
from PIL import Image
from tqdm.auto import tqdm

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from torchvision import models, transforms
from torch.utils.data import Dataset, DataLoader, random_split
from torchvision.models import ResNet18_Weights

In [2]:
ROOT_DIR = "/data/mostafa/rayan/notebooks/haji/affect-net/dataset"

In [3]:
class CustomDataset(Dataset):
    def __init__(self, root_dir: str, transform) -> None:
        self.root_dir: str = root_dir
        self.transform = transform

        path = os.path.join(root_dir, "labels.csv")
        self.df = pd.read_csv(path, index_col=0)

        self.classes = self.df.label.unique().tolist()
        self.c2l = {c: i for i, c in enumerate(self.classes)}
        self.l2c = {i: c for i, c in enumerate(self.classes)}

    def load_image(self, image_path: str):
        try:
            image = Image.open(image_path).convert("RGB")
            return self.transform(image)
        except (OSError, ValueError) as e:
            return None

    def __len__(self) -> int:
        return len(self.df)
        
    def __getitem__(self, index: int):
        row = self.df.iloc[index]
        class_ = row["label"]
        image_sub_path = row["pth"]

        # load image
        image_path = os.path.join(self.root_dir, image_sub_path)
        image = self.load_image(image_path)
        if image is None:
            next_index = (index + 1) % len(self)
            return self.__getitem__(index=next_index)
        
        # convert class to label
        label = self.c2l[class_]
        
        return image, label

# validate dataset

In [4]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.48145466, 0.4578275, 0.40821073],
        std=[0.26862954, 0.26130258, 0.27577711]
    )
])

dataset = CustomDataset(root_dir=ROOT_DIR, transform=transform)
dataloader = DataLoader(dataset=dataset, batch_size=4)

features, labels = next(iter(dataloader))
print("class size:", len(dataset.classes))
print(features.shape, labels.shape)

class size: 8
torch.Size([4, 3, 224, 224]) torch.Size([4])


# Model Defenition

In [5]:
class Model(nn.Module):
    def __init__(self, num_classes: int):
        super().__init__()
        self.resnet = models.resnet18(weights=ResNet18_Weights.DEFAULT)
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, num_classes)
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        return self.softmax(self.resnet(x))

# validate model

In [6]:
model = Model(num_classes=8)

sample_input = torch.randn(16, 3, 224, 224)
output = model(sample_input)
print("Output shape:", output.shape)

Output shape: torch.Size([16, 8])


# Train model

In [7]:
def validate_model(model, dataloader, criterion, device="cpu"):
    model.eval()  # Set model to evaluation mode
    running_loss = 0.0
    correct = 0
    total = 0

    with torch.no_grad():  # Disable gradient calculation
        for inputs, labels in dataloader:
            inputs, labels = inputs.to(device), labels.to(device)

            # Forward pass
            outputs = model(inputs)
            loss = criterion(outputs, labels)

            # Accumulate loss
            running_loss += loss.item()

            # Compute accuracy
            _, predicted = torch.max(outputs, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    avg_loss = running_loss / len(dataloader)
    accuracy = 100 * correct / total
    return avg_loss, accuracy

In [None]:
batch_size = 512
num_epochs = 10
num_classes = 8
split_ratio = 0.9
weight_decay = 1e-4
learning_rate = 1e-3
device = "cuda:1" if torch.cuda.is_available() else "cpu"

# Dataset and DataLoader
transform = transforms.Compose([
    transforms.RandomRotation(degrees=(-30, 30)),
    transforms.RandomResizedCrop(224, scale=(0.8, 1)),
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(
        mean=[0.48145466, 0.4578275, 0.40821073],
        std=[0.26862954, 0.26130258, 0.27577711]
    )
])
dataset = CustomDataset(root_dir=ROOT_DIR, transform=transform)
train_size = int(len(dataset) * split_ratio)
val_size = len(dataset) - train_size

train_set, val_set = random_split(dataset, [train_size, val_size])
train_loader = DataLoader(train_set, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_set, batch_size=batch_size, shuffle=False)

# Model, loss function, and optimizer
model = Model(num_classes=8)
model = model.to(device)
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=learning_rate, weight_decay=weight_decay)

# Training with validation
for epoch in range(num_epochs):
    print()
    print(f" epoch-{epoch + 1} ".center(32, "="))
    model.train()  # Set model to training mode
    running_loss, running_samples = 0, 0

    # train
    progress_bar = tqdm(train_loader)
    for inputs, labels in progress_bar:
        inputs, labels = inputs.to(device), labels.to(device)

        # Forward pass
        outputs = model(inputs)
        loss = criterion(outputs, labels)
        
        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
        
        # Accumulate training loss
        running_loss += loss.item()
        running_samples += len(labels)
        running_loss = running_loss / running_samples

        # Update progress bar
        postfix_str = f"ce: {running_loss:.6f}"
        progress_bar.set_postfix_str(postfix_str)

    # Calculate training loss
    train_loss = running_loss / len(train_loader)

    # Validate the model
    val_loss, val_accuracy = validate_model(model, val_loader, criterion, device)

    # # Update progress bar
    postfix_str = f"train ce: {train_loss:.4f} val ce: {val_loss:.4f} val acc: {val_accuracy:.2f}%"
    print(postfix_str)




  0%|          | 0/50 [00:00<?, ?it/s]

train ce: 0.0000 val ce: 1.8640 val acc: 39.53%



  0%|          | 0/50 [00:00<?, ?it/s]

train ce: 0.0000 val ce: 1.8000 val acc: 46.59%



  0%|          | 0/50 [00:00<?, ?it/s]

train ce: 0.0000 val ce: 1.7381 val acc: 53.09%



  0%|          | 0/50 [00:00<?, ?it/s]

train ce: 0.0000 val ce: 1.7306 val acc: 54.33%



  0%|          | 0/50 [00:00<?, ?it/s]

train ce: 0.0000 val ce: 1.7089 val acc: 56.03%



  0%|          | 0/50 [00:00<?, ?it/s]

train ce: 0.0000 val ce: 1.6685 val acc: 60.65%



  0%|          | 0/50 [00:00<?, ?it/s]

train ce: 0.0000 val ce: 1.6562 val acc: 61.36%



  0%|          | 0/50 [00:00<?, ?it/s]

train ce: 0.0000 val ce: 1.7287 val acc: 54.15%



  0%|          | 0/50 [00:00<?, ?it/s]

train ce: 0.0000 val ce: 1.6809 val acc: 58.66%



  0%|          | 0/50 [00:00<?, ?it/s]

# Save model weights

In [None]:
torch.save(model.state_dict(), "model.pt")

# Inference

In [None]:
CWD_DIR = "/data/mostafa/rayan//notebooks/haji/affect-net"
MODEL_PATH = f"{CWD_DIR}/model.pt"
IMAGE_PATH = f"{CWD_DIR}/dataset/anger/image0000006.jpg"

model.load_state_dict(torch.load(MODEL_PATH, map_location=device, weights_only=True))
model.to(device)

image = dataset.load_image(IMAGE_PATH)
image = image.unsqueeze(0).float().to(device)

model.eval()
with torch.no_grad():
    output = model(image)
    prediction = torch.argmax(output, dim=-1)
    class_ = dataset.l2c[prediction.item()]
    
print(output)
print(prediction)
print(class_)