In [2]:
def replace_second_underscore(s):
    index_to_replace = s.find('_', s.find('_') + 1)
    s = s[:index_to_replace] + '/' + s[index_to_replace + 1:]
    return s

In [16]:
import os
import pandas as pd
from PIL import Image
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import WeightedRandomSampler
import numpy as np
from collections import Counter
import random

class EmotionDataset(Dataset):
    def __init__(self, csv_file, image_dir, transform=None):
        """
        Args:
            csv_file (str): Path to the CSV file with labels.
            image_dir (str): Directory with all the images.
            transform (callable, optional): Optional transform to be applied on a sample.
        """
        self.df = pd.read_csv(csv_file)
        self.image_dir = image_dir
        self.transform = transform
        self.df['image_exists'] = self.df['subDirectory_filePath'].apply(lambda x: os.path.exists(os.path.join(self.image_dir, replace_second_underscore(x))))
        self.df = self.df[self.df['image_exists']].reset_index(drop=True)

        # Label encoding for emotion classes
        self.label_encoder = LabelEncoder()
        self.df['expression'] = self.label_encoder.fit_transform(self.df['expression'])

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        # Get the image and label
        row = self.df.iloc[idx]
        image_path = row['subDirectory_filePath']
        image_path = replace_second_underscore(image_path)
        # index_to_replace = image_path.find('_', image_path.find('_') + 1)
        # image_path = image_path[:index_to_replace] + '/' + image_path[index_to_replace + 1:]
        expression = row['expression']
        valence = row['valence']
        arousal = row['arousal']
        
        # Construct the image path
        image_path = os.path.join(self.image_dir, image_path)
        
        # Load the image
        image = Image.open(image_path).convert('RGB')

        # Apply transformations if any
        if self.transform:
            image = self.transform(image)
        
        # Return the image, emotion class label and valence-arousal
        return image, expression, torch.tensor([valence, arousal], dtype=torch.float)

# Define image transformations (resize, normalization, etc.)
transform = transforms.Compose([
    transforms.Resize((128, 128)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])  # Normalize for pre-trained models
])




# # Initialize the dataset and DataLoader
dataset = EmotionDataset(csv_file='DiffusionFER/DiffusionEmotion_S/dataset_sheet.csv', image_dir='DiffusionFER/', transform=transform)
# # Count the number of instances for each class
# class_counts = np.zeros(7)  # Assuming 7 emotion classes
# for _, emotion_class, _ in dataset:
#     class_counts[emotion_class] += 1

# # Calculate weights for each class (inverse of frequency)
# class_weights = 1.0 / class_counts
# sample_weights = [class_weights[label] for _, label, _ in dataset]

# # Create the WeightedRandomSampler
# sampler = WeightedRandomSampler(weights=sample_weights, num_samples=len(sample_weights), replacement=True)

# Count the number of instances per class
class_counts = Counter([label for _, label, _ in dataset])
max_count = max(class_counts.values())

# Create an empty list to hold the upsampled data
upsampled_data = []

# Perform upsampling for each class
for emotion_class in class_counts:
    class_samples = [sample for sample in dataset if sample[1] == emotion_class]
    # Duplicate until the number matches the majority class
    while len(class_samples) < max_count:
        # Randomly sample with replacement to balance the classes
        class_samples.append(random.choice(class_samples))
    
    upsampled_data.extend(class_samples)

# Shuffle the upsampled dataset to avoid ordering bias
random.shuffle(upsampled_data)

# Create a DataLoader with the upsampled data
dataloader = DataLoader(upsampled_data, batch_size=32, shuffle=True)

# Create a balanced DataLoader using the sampler
# dataloader = DataLoader(dataset, batch_size=32, sampler=sampler)
# dataloader = DataLoader(dataset, batch_size=32, shuffle=True)
# Filter out rows with non-existent images


# Check one batch of data
for images, expressiones, valence_arousal in dataloader:
    print(images.shape)  # Should be (batch_size, 3, 128, 128)
    print(expressiones)
    print(valence_arousal)
    break


torch.Size([32, 3, 128, 128])
tensor([2, 1, 4, 0, 6, 3, 4, 1, 4, 2, 4, 5, 3, 4, 6, 4, 5, 6, 6, 6, 1, 1, 1, 0,
        5, 6, 4, 1, 3, 6, 5, 0])
tensor([[-0.1000,  0.0000],
        [ 0.4000,  0.3000],
        [-0.4000,  0.4000],
        [ 0.0000,  0.2000],
        [-0.4000,  0.4000],
        [ 0.0000,  0.5000],
        [-0.4000,  0.2000],
        [ 0.4000,  0.3000],
        [-0.5000,  0.4000],
        [-0.2000, -0.2000],
        [-0.5000,  0.1000],
        [-0.4000,  0.2000],
        [-0.3000,  0.8000],
        [-0.3000,  0.5000],
        [-0.7000,  0.6000],
        [-0.6000,  0.2000],
        [-0.6000,  0.3000],
        [-0.5000,  0.8000],
        [-0.6000,  0.4000],
        [-0.5000,  0.4000],
        [ 0.4000,  0.3000],
        [ 0.4000,  0.3000],
        [ 0.4000,  0.3000],
        [-0.1000,  0.0000],
        [-0.6000,  0.2000],
        [-0.5000,  0.3000],
        [-0.5000,  0.4000],
        [ 0.5000,  0.4000],
        [-0.1000,  0.7000],
        [-0.7000,  0.5000],
        [-0.5000,

In [17]:
import torch.nn as nn
import torch.optim as optim

class EmotionModel(nn.Module):
    def __init__(self):
        super(EmotionModel, self).__init__()
        self.conv1 = nn.Conv2d(3, 32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        
        self.fc1 = nn.Linear(128 * 16 * 16, 512)
        self.fc2 = nn.Linear(512, 7)  # Output for emotion classification (7 classes)
        
        self.fc3 = nn.Linear(512, 2)  # Output for valence-arousal regression

    def forward(self, x):
        x = torch.relu(self.conv1(x))
        x = torch.max_pool2d(x, 2)
        x = torch.relu(self.conv2(x))
        x = torch.max_pool2d(x, 2)
        x = torch.relu(self.conv3(x))
        x = torch.max_pool2d(x, 2)

        x = x.view(x.size(0), -1)  # Flatten
        
        x = torch.relu(self.fc1(x))
        
        emotion_class_output = self.fc2(x)  # Emotion class prediction
        valence_arousal_output = self.fc3(x)  # Valence-arousal prediction
        
        return emotion_class_output, valence_arousal_output

In [18]:
model = EmotionModel()

if torch.cuda.is_available():
    model.cuda()

In [7]:
# Loss function for multi-task learning
criterion_classification = nn.CrossEntropyLoss()
criterion_regression = nn.MSELoss()

# Optimizer
optimizer = optim.Adam(model.parameters(), lr=0.001)

In [54]:
# # Split the dataset into training and testing sets
# train_size = int(0.8 * len(dataset))
# test_size = len(dataset) - train_size
# train_dataset, test_dataset = torch.utils.data.random_split(dataset, [train_size, test_size])
# # Create DataLoader for training and testing sets
# train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
# test_dataloader = DataLoader(test_dataset, batch_size=32, shuffle=False)

In [19]:
# Training loop
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
num_epochs = 7
for epoch in range(num_epochs):
    model.train()  # Set the model to training mode
    running_loss_classification = 0.0
    running_loss_regression = 0.0
    correct_classifications = 0
    total_samples = 0
    
    for images, emotion_classes, valence_arousal in dataloader:
        # Move to device (if using GPU)
        images = images.to(device)
        emotion_classes = emotion_classes.to(device)
        valence_arousal = valence_arousal.to(device)
        
        # Zero gradients
        optimizer.zero_grad()
        
        # Forward pass
        # emotion_class_output = model(images)
        emotion_class_output, valence_arousal_output = model(images)
        
        # Calculate the classification loss (cross-entropy)
        loss_classification = criterion_classification(emotion_class_output, emotion_classes)
        
        # Calculate the regression loss (MSE)
        loss_regression = criterion_regression(valence_arousal_output, valence_arousal)
        
        # Total loss is a sum of both losses
        # total_loss = loss_classification
        total_loss = loss_classification + loss_regression
        
        # Backward pass and optimize
        total_loss.backward()
        optimizer.step()
        
        # Statistics
        running_loss_classification += loss_classification.item()
        running_loss_regression += loss_regression.item()
        
        # Accuracy for classification
        _, predicted = torch.max(emotion_class_output, 1)
        correct_classifications += (predicted == emotion_classes).sum().item()
        total_samples += images.size(0)
    
   
    
    
    # Print epoch statistics
    print(f'Epoch [{epoch+1}/{num_epochs}], Loss: {running_loss_classification/len(dataloader):.4f}, '
          f'Reg Loss: {running_loss_regression/len(dataloader):.4f}, '
          f'Acc: {100 * correct_classifications/total_samples:.2f}%')

Epoch [1/7], Loss: 1.9464, Reg Loss: 0.1442, Acc: 15.54%
Epoch [2/7], Loss: 1.9464, Reg Loss: 0.1443, Acc: 15.54%
Epoch [3/7], Loss: 1.9464, Reg Loss: 0.1442, Acc: 15.54%
Epoch [4/7], Loss: 1.9464, Reg Loss: 0.1442, Acc: 15.54%
Epoch [5/7], Loss: 1.9464, Reg Loss: 0.1442, Acc: 15.54%
Epoch [6/7], Loss: 1.9464, Reg Loss: 0.1442, Acc: 15.54%
Epoch [7/7], Loss: 1.9464, Reg Loss: 0.1442, Acc: 15.54%


In [20]:
torch.save(model.state_dict(), 'emotion_model.pth')