In [None]:
import torch
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
import os
import json

from PIL import Image


class EmoSet(Dataset):
    ATTRIBUTES_MULTI_CLASS = [
        'scene', 'facial_expression', 'human_action', 'brightness', 'colorfulness',
    ]
    ATTRIBUTES_MULTI_LABEL = [
        'object'
    ]
    NUM_CLASSES = {
        'brightness': 11,
        'colorfulness': 11,
        'scene': 254,
        'object': 409,
        'facial_expression': 6,
        'human_action': 264,
    }

    def __init__(self,
                 data_root,
                 num_emotion_classes,
                 phase,
                 ):
        assert num_emotion_classes in (8, 2)
        assert phase in ('train', 'val', 'test')
        self.transforms_dict = self.get_data_transforms()

        self.info = self.get_info(data_root, num_emotion_classes)

        if phase == 'train':
            self.transform = self.transforms_dict['train']
        elif phase == 'val':
            self.transform = self.transforms_dict['val']
        elif phase == 'test':
            self.transform = self.transforms_dict['test']
        else:
            raise NotImplementedError

        data_store = json.load(open(os.path.join(data_root, f'{phase}.json')))
        self.data_store = [
            [
                self.info['emotion']['label2idx'][item[0]],
    
                os.path.join(data_root, item[1]),
                os.path.join(data_root, item[2])
            ]
            for item in data_store
        ]

    @classmethod
    def get_data_transforms(cls):
        transforms_dict = {
            'train': transforms.Compose([
                transforms.RandomResizedCrop(224),
                transforms.RandomHorizontalFlip(),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ]),
            'val': transforms.Compose([
                transforms.Resize(224),
                transforms.CenterCrop(224),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ]),
            'test': transforms.Compose([
                transforms.Resize(224),
                transforms.CenterCrop(224),
                transforms.ToTensor(),
                transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225])
            ]),
        }
        return transforms_dict

    def get_info(self, data_root, num_emotion_classes):
        assert num_emotion_classes in (8, 2)
        info = json.load(open(os.path.join(data_root, 'info.json')))
        if num_emotion_classes == 8:
            pass
        elif num_emotion_classes == 2:
            emotion_info = {
                'label2idx': {
                    'amusement': 0,
                    'awe': 0,
                    'contentment': 0,
                    'excitement': 0,
                    'anger': 1,
                    'disgust': 1,
                    'fear': 1,
                    'sadness': 1,
                },
                'idx2label': {
                    '0': 'positive',
                    '1': 'negative',
                }
            }
            info['emotion'] = emotion_info
        else:
            raise NotImplementedError

        return info

    def load_image_by_path(self, path):
        image = Image.open(path).convert('RGB')
        image = self.transform(image)
        return image

    def load_annotation_by_path(self, path):
        json_data = json.load(open(path))
        return json_data

    def __getitem__(self, item):
        emotion_label_idx, image_path, annotation_path = self.data_store[item]
        image = self.load_image_by_path(image_path)
        annotation_data = self.load_annotation_by_path(annotation_path)
        data = { 'image': image, 'emotion_label_idx': emotion_label_idx}

        for attribute in self.ATTRIBUTES_MULTI_CLASS:
            # if empty, set to -1, else set to label index
            attribute_label_idx = -1
            if attribute in annotation_data:
                attribute_label_idx = self.info[attribute]['label2idx'][str(annotation_data[attribute])]
            data.update({f'{attribute}_label_idx': attribute_label_idx})

        for attribute in self.ATTRIBUTES_MULTI_LABEL:
            # if empty, set to 0, else set to 1
            assert attribute == 'object'
            num_classes = self.NUM_CLASSES[attribute]
            attribute_label_idx = torch.zeros(num_classes)
            if attribute in annotation_data:
                for label in annotation_data[attribute]:
                    attribute_label_idx[self.info[attribute]['label2idx'][label]] = 1
            data.update({f'{attribute}_label_idx': attribute_label_idx})

        return data

    def __len__(self):
        return len(self.data_store)


if __name__ == '__main__':
    data_root = "dataset"
    num_emotion_classes = 8
    phase = 'train'

    dataset = EmoSet(
        data_root=data_root,
        num_emotion_classes=num_emotion_classes,
        phase=phase,
    )
    dataloader = DataLoader(dataset, batch_size = 256, shuffle = True)
    print(dataloader)
    for i,data in enumerate(dataloader):
        
        print(data['emotion_label_idx'])
        print(data['scene_label_idx'])
        print(data['facial_expression_label_idx'])
        print(data['human_action_label_idx'])
        print(data['brightness_label_idx'])
        print(data['colorfulness_label_idx'])
        print(data['object_label_idx'])
        break



In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.models as models
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt
from sklearn.metrics import confusion_matrix, classification_report
import os

class EmotionRecognitionModel(nn.Module):
    def __init__(self, num_emotions=8, hidden_dim=128):
        super(EmotionRecognitionModel, self).__init__()
        
       
        resnet = models.resnet50(pretrained=True)
        self.cnn = nn.Sequential(*list(resnet.children())[:-1])  
        self.image_fc = nn.Linear(resnet.fc.in_features, hidden_dim)  
        
       
        self.dropout = nn.Dropout(0.5)
        
       
        self.classifier = nn.Linear(hidden_dim, num_emotions) 

    def forward(self, image):
        
        image_features = self.cnn(image)  
        image_features = image_features.view(image_features.size(0), -1)  
        image_features = self.image_fc(image_features)  
        
    
        image_features = self.dropout(image_features)
        
      
        output = self.classifier(image_features)  
        return output



In [None]:




def process_batch(model, batch, device):
    
    images = batch['image'].to(device)
    emotion_labels = batch['emotion_label_idx'].to(device)
    outputs = model(images)
    
    return outputs, emotion_labels

def train_and_validate(model, train_dataloader, valid_dataloader, criterion, optimizer, num_epochs=10, device='cuda', save_dir='checkpoints'):
    model.to(device)
    
   
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
 
    train_losses = []
    train_accuracies = []
    valid_losses = []
    valid_accuracies = []
    
    best_valid_acc = 0.0
    best_model_wts = model.state_dict()
    
    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        print('-' * 30)
        
        
        model.train()
        running_loss = 0.0
        correct_predictions = 0
        total_predictions = 0
        
        print("Training...")
        for i, batch in enumerate(train_dataloader):
            try:
                optimizer.zero_grad()
                
                outputs, emotion_labels = process_batch(model, batch, device)
                loss = criterion(outputs, emotion_labels)
                loss.backward()
                optimizer.step()
                
                running_loss += loss.item()
                
                
                _, predicted = torch.max(outputs.data, 1)
                total_predictions += emotion_labels.size(0)
                correct_predictions += (predicted == emotion_labels).sum().item()
                
                if i % 20 == 0:
                    batch_accuracy = 100 * (predicted == emotion_labels).sum().item() / emotion_labels.size(0)
                    print(f"Batch {i}, Loss: {loss.item():.4f}, Accuracy: {batch_accuracy:.2f}%")
                    
            except Exception as e:
                print(f"Error in training batch {i}: {e}")
                continue
        
        
        train_epoch_loss = running_loss / len(train_dataloader)
        train_epoch_accuracy = 100 * correct_predictions / total_predictions
        
        train_losses.append(train_epoch_loss)
        train_accuracies.append(train_epoch_accuracy)
        
        print(f"Training - Loss: {train_epoch_loss:.4f}, Accuracy: {train_epoch_accuracy:.2f}%")
        
      
        model.eval()
        running_loss = 0.0
        correct_predictions = 0
        total_predictions = 0
        
        print("Validating...")
        with torch.no_grad():
            for i, batch in enumerate(valid_dataloader):
                try:
                    outputs, emotion_labels = process_batch(model, batch, device)
                    loss = criterion(outputs, emotion_labels)
                    
                    running_loss += loss.item()
                    
                    
                    _, predicted = torch.max(outputs.data, 1)
                    total_predictions += emotion_labels.size(0)
                    correct_predictions += (predicted == emotion_labels).sum().item()
                    
                except Exception as e:
                    print(f"Error in validation batch {i}: {e}")
                    continue
   
        valid_epoch_loss = running_loss / len(valid_dataloader)
        valid_epoch_accuracy = 100 * correct_predictions / total_predictions
        
        valid_losses.append(valid_epoch_loss)
        valid_accuracies.append(valid_epoch_accuracy)
        
        print(f"Validation - Loss: {valid_epoch_loss:.4f}, Accuracy: {valid_epoch_accuracy:.2f}%")
        
        
        checkpoint = {
            'epoch': epoch + 1,
            'model_state_dict': model.state_dict(),
            'optimizer_state_dict': optimizer.state_dict(),
            'train_loss': train_epoch_loss,
            'train_acc': train_epoch_accuracy,
            'valid_loss': valid_epoch_loss,
            'valid_acc': valid_epoch_accuracy,
            'best_valid_acc': best_valid_acc
        }
        checkpoint_path = os.path.join(save_dir, f'emotion_recognition_image_only_epoch_{epoch+1}.pth')
        torch.save(checkpoint, checkpoint_path)
        print(f"Model saved at: {checkpoint_path}")
        
       
        if valid_epoch_accuracy > best_valid_acc:
            best_valid_acc = valid_epoch_accuracy
            best_model_wts = model.state_dict().copy()
            print(f"New best model saved with accuracy: {best_valid_acc:.2f}%")
    
    
    model.load_state_dict(best_model_wts)
    print(f"Training complete! Best validation accuracy: {best_valid_acc:.2f}%")
    
    
    history = {
        'train_loss': train_losses,
        'train_acc': train_accuracies,
        'valid_loss': valid_losses,
        'valid_acc': valid_accuracies,
        'best_acc': best_valid_acc
    }
    
    return model, history


if __name__ == "__main__":
    
    
    data_root = "dataset"
    num_emotion_classes = 8
    device = 'cuda' if torch.cuda.is_available() else 'cpu'
    
  
    train_dataset = EmoSet(data_root=data_root, num_emotion_classes=num_emotion_classes, phase='train')
    valid_dataset = EmoSet(data_root=data_root, num_emotion_classes=num_emotion_classes, phase='val')
    train_dataloader = DataLoader(train_dataset, batch_size=32, shuffle=True)
    valid_dataloader = DataLoader(valid_dataset, batch_size=32, shuffle=False)
    
    
    model = EmotionRecognitionModel(num_emotions=num_emotion_classes, hidden_dim=128).to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=0.001)
    
   
    trained_model, history = train_and_validate(
        model, train_dataloader, valid_dataloader, criterion, optimizer, 
        num_epochs=10, device=device, save_dir='checkpoints'
    )
    
   
    torch.save(trained_model.state_dict(), "emotion_recognition_image_only_best.pth")




Epoch 1/10
------------------------------
Training...
Batch 0, Loss: 2.1188, Accuracy: 12.50%
Batch 20, Loss: 2.3041, Accuracy: 25.00%
Batch 40, Loss: 2.1736, Accuracy: 12.50%
Batch 60, Loss: 1.9714, Accuracy: 15.62%
Batch 80, Loss: 1.9335, Accuracy: 18.75%
Batch 100, Loss: 1.7839, Accuracy: 21.88%
Batch 120, Loss: 2.0235, Accuracy: 21.88%
Batch 140, Loss: 1.6135, Accuracy: 31.25%
Batch 160, Loss: 1.9593, Accuracy: 28.12%
Batch 180, Loss: 1.8112, Accuracy: 34.38%
Batch 200, Loss: 1.9929, Accuracy: 21.88%
Batch 220, Loss: 1.7863, Accuracy: 28.12%
Batch 240, Loss: 1.8841, Accuracy: 21.88%
Batch 260, Loss: 1.6894, Accuracy: 34.38%
Batch 280, Loss: 1.9629, Accuracy: 18.75%
Batch 300, Loss: 1.5842, Accuracy: 50.00%
Batch 320, Loss: 1.9542, Accuracy: 25.00%
Batch 340, Loss: 1.6473, Accuracy: 37.50%
Batch 360, Loss: 1.7395, Accuracy: 31.25%
Batch 380, Loss: 1.9867, Accuracy: 21.88%
Batch 400, Loss: 1.4335, Accuracy: 53.12%
Batch 420, Loss: 1.7422, Accuracy: 34.38%
Batch 440, Loss: 1.7070, Ac

In [None]:
from sklearn.metrics import classification_report

# Load test dataset
test_dataset = EmoSet(data_root=data_root, num_emotion_classes=num_emotion_classes, phase='test')
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False)

# Load trained model
model = EmotionRecognitionModel(num_emotions=num_emotion_classes, hidden_dim=128).to(device)
model.load_state_dict(torch.load("checkpoints/emotion_recognition_image_only_epoch_1.pth", map_location=device))
model.eval()

# Evaluate
all_preds = []
all_labels = []

with torch.no_grad():
    for batch in test_loader:
        outputs, emotion_labels = process_batch(model, batch, device)
        preds = outputs.argmax(dim=1)
        all_preds.extend(preds.cpu().numpy())
        all_labels.extend(emotion_labels.cpu().numpy())

# Classification report
emotion_labels = ['amusement', 'anger', 'awe', 'contentment', 'disgust', 'excitement', 'fear', 'sadness']
print("Image-Only Emotion Recognition Classification Report:")
print(classification_report(all_labels, all_preds, target_names=emotion_labels, zero_division=0))