In [30]:
import pandas as pd
import os
import torch
import torchvision.transforms as transforms
import torchvision.models as models
from torch.utils.data import DataLoader, Dataset
import torch.nn as nn
import torch.optim as optim
from PIL import Image
from torch.optim import lr_scheduler
from tqdm import tqdm

# Load the annotations for training and validation from separate CSV files
IMAGE_FOLDER = r"D:\\DebosmitaPhD\\VALENCE-AROUSAL\\AffectNet\\train_set\\images"
IMAGE_FOLDER_TEST = r"D:\\DebosmitaPhD\\VALENCE-AROUSAL\\AffectNet\\val_set\\images"
train_annotations_path = (
    r"D:\\DebosmitaPhD\\VALENCE-AROUSAL\\CAGE-Affectnet-CVPR\\affectnet_annotations\\train_set_annotation_without_lnd.csv"
)
valid_annotations_path = (
    r"D:\\DebosmitaPhD\\VALENCE-AROUSAL\\CAGE-Affectnet-CVPR\\affectnet_annotations\\val_set_annotation_without_lnd.csv"
)
train_annotations_df = pd.read_csv(train_annotations_path)
valid_annotations_df = pd.read_csv(valid_annotations_path)

In [31]:
from torch.utils.data import Subset

# Set parameters
BATCHSIZE = 128
NUM_EPOCHS = 20
LR = 0.0001#4e-5
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")


# **** Create dataset and data loaders ****
class CustomDataset(Dataset):
    def __init__(self, dataframe, root_dir, transform=None, balance=False):
        self.dataframe = dataframe
        self.transform = transform
        self.root_dir = root_dir
        self.balance = balance

        if self.balance:
            self.dataframe = self.balance_dataset()

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        image_path = os.path.join(
            self.root_dir, f"{self.dataframe['number'].iloc[idx]}.jpg"
        )
        image = Image.open(image_path)

        classes = torch.tensor(self.dataframe.iloc[idx, 1], dtype=torch.int8)
        valence = torch.tensor(self.dataframe.iloc[idx, 2], dtype=torch.float16)
        arousal = torch.tensor(self.dataframe.iloc[idx, 3], dtype=torch.float16)

        if self.transform:
            image = self.transform(image)

        return image, classes, valence, arousal

    def balance_dataset(self):
        balanced_df = self.dataframe.groupby("exp", group_keys=False).apply(
            lambda x: x.sample(self.dataframe["exp"].value_counts().min())
        )
        return balanced_df


transform = transforms.Compose(
    [
        transforms.RandomHorizontalFlip(0.5),
        transforms.RandomGrayscale(0.01),
        transforms.RandomRotation(10),
        transforms.ColorJitter(
            brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1
        ),  # model more robust to changes in lighting conditions.
        transforms.RandomPerspective(
            distortion_scale=0.2, p=0.5
        ),  # can be helpful if your images might have varying perspectives.
        transforms.ToTensor(),  # saves image as tensor (automatically divides by 255)
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
        transforms.RandomErasing(
            p=0.5, scale=(0.02, 0.2), ratio=(0.3, 3.3), value="random"
        ),  # Should help overfitting
    ]
)

transform_valid = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
    ]
)

# Select the first 1000 indices for faster experimentation
# subset_indices = list(range(100))

# Create a subset of the training dataset
# train_subset = Subset(
#     CustomDataset(
#         dataframe=train_annotations_df,
#         root_dir=IMAGE_FOLDER,
#         transform=transform,
#         balance=True,
#     ),
#     subset_indices,
# )

# val_subset = Subset(
#     CustomDataset(
#         dataframe=train_annotations_df,
#         root_dir=IMAGE_FOLDER_TEST,
#         transform=transform,
#         balance=False,
#     ),
#     subset_indices,
# )

train_dataset = CustomDataset(
    dataframe=train_annotations_df,
    root_dir=IMAGE_FOLDER,
    transform=transform,
    balance=True,
)


valid_dataset = CustomDataset(
    dataframe=valid_annotations_df,
    root_dir=IMAGE_FOLDER_TEST,
    transform=transform_valid,
    balance=False,
)
train_loader = DataLoader(
    train_dataset, batch_size=BATCHSIZE, shuffle=True, num_workers=0
)
# Use the subset in the DataLoader
# train_loader = DataLoader(
#     train_subset,
#     batch_size=BATCHSIZE,
#     shuffle=True,
#     num_workers=0
# )
valid_loader = DataLoader(
    valid_dataset, batch_size=BATCHSIZE, shuffle=False, num_workers=0
)

In [32]:
# Display a single batch from the train_loader
for images, classes, valence, arousal in train_loader:
    print("Batch of Images:")
    print(images.shape)  # Shape of the images tensor
    print("Batch of Classes (Labels):")
    print(classes)  # Tensor of class labels
    print("Batch of Valence Values:")
    print(valence)  # Tensor of valence values
    print("Batch of Arousal Values:")
    print(arousal)  # Tensor of arousal values
    break  # Break after the first batch to avoid printing all batches

Batch of Images:
torch.Size([128, 3, 224, 224])
Batch of Classes (Labels):
tensor([1, 5, 7, 1, 5, 3, 3, 6, 5, 0, 4, 2, 7, 3, 3, 2, 7, 5, 6, 3, 3, 4, 7, 2,
        2, 5, 6, 5, 2, 7, 5, 2, 3, 4, 6, 7, 7, 6, 5, 5, 0, 2, 4, 4, 7, 2, 6, 6,
        5, 6, 2, 0, 6, 6, 7, 1, 0, 4, 2, 4, 6, 3, 4, 1, 1, 1, 7, 3, 3, 4, 6, 2,
        0, 5, 2, 7, 2, 1, 7, 2, 3, 3, 4, 7, 3, 5, 6, 3, 6, 5, 5, 5, 3, 3, 3, 1,
        0, 2, 0, 2, 5, 2, 2, 4, 2, 4, 3, 7, 5, 1, 6, 6, 7, 7, 3, 0, 0, 3, 0, 0,
        4, 0, 1, 0, 4, 7, 1, 2], dtype=torch.int8)
Batch of Valence Values:
tensor([ 0.3413, -0.7900, -0.5664,  0.4951, -0.7617,  0.2566, -0.0823, -0.5615,
        -0.7271,  0.0000, -0.1869, -0.5176, -0.6509,  0.1587,  0.0435, -0.6982,
        -0.5225, -0.7197, -0.4016,  0.1984,  0.1208, -0.0968, -0.6387, -0.3174,
        -0.5664, -0.7617, -0.3389, -0.8130, -0.6899, -0.5469, -0.6816, -0.5337,
         0.2778, -0.0862, -0.1016, -0.6279, -0.6348, -0.4443, -0.8252, -0.7109,
         0.0000, -0.5708, -0.1016, -0.1190, -0.61

In [33]:
# ***** Define the model *****

# Initialize the model
MODEL = models.efficientnet_v2_s(weights="DEFAULT")
# Freeze feature extractor layers
# for param in MODEL.features.parameters():
#     param.requires_grad = False
num_features = MODEL.classifier[1].in_features
# MODEL.classifier[1] = nn.Linear(in_features=num_features, out_features=10)
# MODEL.to(DEVICE)
# MODEL.load_state_dict(torch.load("../AffectNet8_Efficientnet_Combined/model.pt"))
MODEL.classifier[1] = nn.Linear(in_features=num_features, out_features=2)
MODEL.to(DEVICE)

EfficientNet(
  (features): Sequential(
    (0): Conv2dNormActivation(
      (0): Conv2d(3, 24, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1), bias=False)
      (1): BatchNorm2d(24, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
      (2): SiLU(inplace=True)
    )
    (1): Sequential(
      (0): FusedMBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(24, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (1): BatchNorm2d(24, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
            (2): SiLU(inplace=True)
          )
        )
        (stochastic_depth): StochasticDepth(p=0.0, mode=row)
      )
      (1): FusedMBConv(
        (block): Sequential(
          (0): Conv2dNormActivation(
            (0): Conv2d(24, 24, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
            (1): BatchNorm2d(24, eps=0.001, momentum=0.1, affine=True, track_running_stats=True)
  

In [34]:
def CCCLoss(x, y):
    # Compute means
    x_mean = torch.mean(x, dim=0)
    y_mean = torch.mean(y, dim=0)
    # Compute variances
    x_var = torch.var(x, dim=0)
    y_var = torch.var(y, dim=0)
    # Compute covariance matrix
    cov_matrix = torch.matmul(
        (x - x_mean).permute(*torch.arange(x.dim() - 1, -1, -1)), y - y_mean
    ) / (x.size(0) - 1)
    # Compute CCC
    numerator = 2 * cov_matrix
    denominator = x_var + y_var + torch.pow((x_mean - y_mean), 2)
    ccc = torch.mean(numerator / denominator)
    return -ccc


val_loss = nn.MSELoss()
aro_loss = nn.MSELoss()

optimizer = optim.AdamW(MODEL.parameters(), lr=LR)
lr_scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max=BATCHSIZE * NUM_EPOCHS)

# ***** Train the model *****
print("--- Start training ---")
scaler = torch.cuda.amp.GradScaler()
best_valid_loss = 100
l2_lambda = 0.00001  # L1 Regularization
l1_lambda = 0.00001  # L2 Regularization

for epoch in range(NUM_EPOCHS):
    MODEL.train()
    total_train_correct = 0
    total_train_samples = 0
    current_lr = optimizer.param_groups[0]["lr"]
    for images, _, val_true, aro_true in tqdm(
        train_loader, desc="Epoch train_loader progress"
    ):
        images, val_true, aro_true = (
            images.to(DEVICE),
            val_true.to(DEVICE),
            aro_true.to(DEVICE),
        )
        optimizer.zero_grad()
        train_loss = 0
        l2_reg = 0
        l1_reg = 0
        with torch.autocast(device_type="cuda", dtype=torch.float16):
            outputs = MODEL(images)
            val_pred = outputs[:, 0]
            aro_pred = outputs[:, 1]
            for param in MODEL.parameters():
                l2_reg += torch.norm(param, 2)  # **2
                l1_reg += torch.norm(param, 1)
            loss = (
                3 * val_loss(val_pred.cuda(), val_true.cuda())
                + 3 * aro_loss(aro_pred.cuda(), aro_true.cuda())
                + CCCLoss(val_pred.cuda(), val_true.cuda())
                + CCCLoss(aro_pred.cuda(), aro_true.cuda())
            )
            train_loss += loss.item()
            scaler.scale(loss).backward()
            scaler.step(optimizer)
            scaler.update()

    MODEL.eval()
    valid_loss = 0.0
    total_valid_correct = 0
    total_valid_samples = 0
    with torch.no_grad():
        for images, _, val_true, aro_true in valid_loader:
            images, val_true, aro_true = (
                images.to(DEVICE),
                val_true.to(DEVICE),
                aro_true.to(DEVICE),
            )
            with torch.autocast(device_type="cuda", dtype=torch.float16):
                outputs = MODEL(images)
                val_pred = outputs[:, 0]
                aro_pred = outputs[:, 1]
                loss = (
                    3 * val_loss(val_pred.cuda(), val_true.cuda())
                    + 3 * aro_loss(aro_pred.cuda(), aro_true.cuda())
                    + CCCLoss(val_pred.cuda(), val_true.cuda())
                    + CCCLoss(aro_pred.cuda(), aro_true.cuda())
                )
                valid_loss += loss.item()

    print(
        f"Epoch [{epoch+1}/{NUM_EPOCHS}] - "
        f"Training Loss: {train_loss/len(train_loader):.4f}, "
        f"Validation Loss: {valid_loss/len(valid_loader):.4f}, "
        f"Learning Rate: {current_lr:.8f}, "
    )

    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        print(f"Saving model at epoch {epoch+1}")
        torch.save(MODEL.state_dict(), "model.pt")  # Save the best model


  scaler = torch.cuda.amp.GradScaler()


--- Start training ---


Epoch train_loader progress:  34%|███▎      | 79/235 [03:30<06:55,  2.66s/it]


KeyboardInterrupt: 