In [3]:
# !pip install -U upgrade pip
# !pip install -q pytorch_lightning
# !pip install -q ipywidgets
# !pip install ultralytics

In [4]:
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import EarlyStopping

from torch import nn, optim
from torch.optim.lr_scheduler import StepLR
import torch 
from torch.utils.data import DataLoader, Subset, Dataset, random_split

from torchvision.models.efficientnet import EfficientNet_V2_S_Weights
from torchvision.transforms.functional import InterpolationMode, to_tensor
from torchvision import models, transforms, datasets
from torchvision import datasets, transforms

import torchmetrics
from torchmetrics import ConfusionMatrix, Precision, Recall, F1Score

from collections import Counter
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
import io
from PIL import Image

In [5]:
# Ajuste de carpeta no utilizada

# # Copiar a directorio de trabajo 
# !cp -a /kaggle/input/young-affectnet-hq /kaggle/working/young-affectnet-hq
# # Eliminar la carpeta contempt del directorio copiado (conservando el resto)
# !rm -r /kaggle/working/young-affectnet-hq/contempt
# # Cambiar el nombre de la carpeta anger por angry
# !mv /kaggle/working/young-affectnet-hq/anger /kaggle/working/young-affectnet-hq/angry

import os
import shutil

# Directorio fuente y destino
source_dir = "/kaggle/input/young-affectnet-hq"
target_dir = "/kaggle/working/young-affectnet-hq"

# Comprobar si el directorio ya ha sido copiado
if not os.path.exists(target_dir):
    # Copiar el directorio
    shutil.copytree(source_dir, target_dir)
    print(f"Directorio copiado: {target_dir}")
else:
    print(f"El directorio ya existe: {target_dir}")

# Comprobar si la carpeta 'contempt' existe y eliminarla
contempt_dir = os.path.join(target_dir, "contempt")
if os.path.exists(contempt_dir):
    shutil.rmtree(contempt_dir)
    print(f"Carpeta eliminada: {contempt_dir}")
else:
    print(f"La carpeta no existe o ya fue eliminada: {contempt_dir}")

# Comprobar si la carpeta 'anger' necesita ser renombrada a 'angry'
anger_dir = os.path.join(target_dir, "anger")
angry_dir = os.path.join(target_dir, "angry")

if os.path.exists(anger_dir) and not os.path.exists(angry_dir):
    os.rename(anger_dir, angry_dir)
    print(f"Carpeta renombrada de 'anger' a 'angry'")
else:
    if not os.path.exists(anger_dir):
        print("La carpeta 'anger' no existe.")
    if os.path.exists(angry_dir):
        print("La carpeta 'angry' ya existe.")


Directorio copiado: /kaggle/working/young-affectnet-hq
Carpeta eliminada: /kaggle/working/young-affectnet-hq/contempt
Carpeta renombrada de 'anger' a 'angry'


# Creacion de los datamodules 

In [6]:
class CustomDataset(Dataset):
    def __init__(self, subset, transform=None):
        self.subset = subset
        self.transform = transform

    def __getitem__(self, index):
        x, y = self.subset[index]
        if self.transform:
            x = self.transform(x)
        return x, y

    def __len__(self):
        return len(self.subset)
    
def get_weights(dataset_path):
    class_counts = {}
    for clase in os.listdir(f'{dataset_path}'):
        class_counts[clase] = len(os.listdir(f'{dataset_path}/{clase}'))
        
    # Total de muestras
    total_samples = sum(class_counts.values())

    # Calcular los pesos inversos de la frecuencia de las clases
    class_weights = {cls: total_samples / count for cls, count in class_counts.items()}

    # Normalizar los pesos para que el menor sea 1.0
    min_weight = min(class_weights.values())
    normalized_weights = {cls: weight / min_weight for cls, weight in class_weights.items()}

    # Convertir los pesos a un tensor para usar en PyTorch
    weights_tensor = torch.tensor(list(normalized_weights.values()), dtype=torch.float32)

    return weights_tensor

## Fer2013 datamodule

In [7]:
class FER2013DataModule(pl.LightningDataModule):
    def __init__(self, train_dir: str, test_dir: str, batch_size: int = 128):
        super().__init__()
        
        self.train_dir = train_dir
        self.test_dir = test_dir
        self.batch_size = batch_size
        
        self.train_transforms = transforms.Compose([
            transforms.Resize(256, interpolation=InterpolationMode.BILINEAR),
            transforms.CenterCrop(224),
            transforms.Grayscale(num_output_channels=3),  # Convert 1 channel grayscale to 3 channel grayscale
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
        
        self.val_transforms = transforms.Compose([
            transforms.Resize(256, interpolation=InterpolationMode.BILINEAR),
            transforms.CenterCrop(224),
            transforms.Grayscale(num_output_channels=3),  # Convert 1 channel grayscale to 3 channel grayscale
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
        
        self.test_transforms = transforms.Compose([
            transforms.Resize(256, interpolation=InterpolationMode.BILINEAR),
            transforms.CenterCrop(224),
            transforms.Grayscale(num_output_channels=3),  # Convert 1 channel grayscale to 3 channel grayscale
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])

    def setup(self, stage: str = None):
        if stage == 'fit' or stage is None:
            full_dataset = datasets.ImageFolder(self.train_dir)
            train_size = int(0.85 * len(full_dataset))
            val_size = len(full_dataset) - train_size
    
            train_indices, val_indices = random_split(range(len(full_dataset)), 
                                                      [train_size, val_size], 
                                                      generator=torch.Generator().manual_seed(42))
            
            train_subset = Subset(full_dataset, train_indices)
            val_subset = Subset(full_dataset, val_indices)
            
            self.train_ds = CustomDataset(train_subset, transform=self.train_transforms)
            self.val_ds = CustomDataset(val_subset, transform=self.val_transforms)

        if stage == 'test' or stage is None:
            self.test_ds = datasets.ImageFolder(self.test_dir, transform=self.test_transforms)
    
    def get_n_classes(self):
        return len(self.full_dataset.classes)
    
    def get_weights(self):
        # Calcular y retornar los pesos de las clases
        return get_weights(self.train_dir)
    
    def train_dataloader(self):
        return DataLoader(self.train_ds, batch_size=self.batch_size, shuffle=True, num_workers=4)

    def val_dataloader(self):
        return DataLoader(self.val_ds, batch_size=self.batch_size, shuffle=False, num_workers=4)

    def test_dataloader(self):
        return DataLoader(self.test_ds, batch_size=self.batch_size, shuffle = False, num_workers=4)
    
    
# fer2013_ds = FER2013DataModule(train_dir = '/kaggle/input/fer2013/train', test_dir = '/kaggle/input/fer2013/test', batch_size = 64)

## Young affectnet hq 

In [8]:
class YoungAffectNetDataModule(pl.LightningDataModule):
    def __init__(self, imgs_dir: str, batch_size: int = 128):
        super().__init__()
        self.imgs_dir = imgs_dir
        self.batch_size = batch_size
        
        self.full_dataset = None
        self.train_ds = None
        self.val_ds = None
        self.test_ds = None
        
        self.train_transforms = transforms.Compose([
#             transforms.RandomResizedCrop(224),
#             transforms.RandomHorizontalFlip(),
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
        
        self.val_transforms = transforms.Compose([
            transforms.Resize(256),
            transforms.CenterCrop(224),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
        
    def prepare_data(self):
        self.full_dataset = datasets.ImageFolder(self.imgs_dir)
        train_size = int(0.7 * len(self.full_dataset))
        val_size = int(0.2 * len(self.full_dataset))
        test_size = len(self.full_dataset) - train_size - val_size

        self.train_indices, self.val_indices, self.test_indices = random_split(
            range(len(self.full_dataset)), 
            [train_size, val_size, test_size], 
            generator=torch.Generator().manual_seed(42)
        )


    def setup(self, stage: str = None):
        if not self.full_dataset:
            self.prepare_data() 
            
        if stage == 'fit' or stage is None:
            train_subset = Subset(self.full_dataset, self.train_indices)
            val_subset = Subset(self.full_dataset, self.val_indices)
            
            self.train_ds = CustomDataset(train_subset, transform=self.train_transforms)
            self.val_ds = CustomDataset(val_subset, transform=self.val_transforms)
            
        if stage == 'test' or stage is None:
            test_subset = Subset(self.full_dataset, self.test_indices)
            self.test_ds = CustomDataset(test_subset, transform=self.val_transforms)
    
    def get_weights(self):
        if not self.full_dataset:
            self.prepare_data()
        
        # Calcular y retornar los pesos de las clases
        return get_weights(self.imgs_dir)
    
    def get_n_classes(self):
        print(self.full_dataset.classes)
        return len(self.full_dataset.classes)
    
    def train_dataloader(self):
        return DataLoader(self.train_ds, batch_size=self.batch_size, shuffle=True, num_workers=4)

    def val_dataloader(self):
        return DataLoader(self.val_ds, batch_size=self.batch_size, shuffle=False, num_workers=4)

    def test_dataloader(self):
        return DataLoader(self.test_ds, batch_size=self.batch_size, shuffle=False, num_workers=4)

    
# Luego, para usarlo en el entrenador:
# young_affectnet_ds = YoungAffectNetDataModule(imgs_dir='/kaggle/working/young-affectnet-hq', batch_size = 64)
# young_affectnet_ds.setup("fit")
# young_affectnet_ds.get_n_classes()

In [9]:
class FaceEmotionClassifier(pl.LightningModule):
    def __init__(self, model_name, num_classes=7, learning_rate=0.00001, pretrained = True, weights = None):
        super().__init__()
        
        assert model_name in ['efficientnet_b6', 'resnet50', 'mobilenet_v3_small', 'mobilenet_v3_large', 'efficientnet_v2_s']
        
        pretrained_imagenet = 'IMAGENET1K_V1' if pretrained else None
        
        if model_name == 'efficientnet_b6':
            self.model = models.efficientnet_b6(weights=pretrained_imagenet)
            
#             for param in self.model.parameters():
#                 param.requires_grad = False
            
            self.model.classifier[1] = nn.Linear(self.model.classifier[1].in_features, num_classes)
        
        if model_name == 'mobilenet_v3_small':
            self.model = models.mobilenet_v3_small(weights=pretrained_imagenet)
                        
#             for param in self.model.parameters():
#                 param.requires_grad = False
            
            self.model.classifier[3] = nn.Linear(self.model.classifier[3].in_features, num_classes)
        
        elif model_name == 'mobilenet_v3_large':
            self.model = models.mobilenet_v3_large(weights=pretrained_imagenet)
            
#             for param in self.model.parameters():
#                 param.requires_grad = False
            
            self.model.classifier[3] = nn.Linear(self.model.classifier[3].in_features, num_classes)
        
        elif model_name == 'efficientnet_v2_s':
            self.model = models.efficientnet_v2_s(weights=pretrained_imagenet)
            
#             for param in self.model.parameters():
#                 param.requires_grad = False
            
            self.model.classifier[1] = nn.Linear(self.model.classifier[1].in_features, num_classes)
        
        elif model_name == 'resnet50':
            self.model = models.resnet50(weights=pretrained_imagenet)
#           (fc): Linear(in_features=2048, out_features=1000, bias=True)
            self.model.fc = nn.Linear(self.model.fc.in_features, num_classes)
            
            
        self.criterion = nn.CrossEntropyLoss(weight=weights)
        self.learning_rate = learning_rate
        # Metrics
        self.train_acc = torchmetrics.Accuracy(task="multiclass", num_classes=num_classes)
        self.val_acc = torchmetrics.Accuracy(task="multiclass", num_classes=num_classes)
        self.test_acc = torchmetrics.Accuracy(task="multiclass", num_classes=num_classes)
        
        self.train_precision = torchmetrics.Precision(task="multiclass", average='micro', num_classes=num_classes)
#         self.val_precision = torchmetrics.Precision(task="multiclass", average='micro', num_classes=num_classes)
        self.test_precision = torchmetrics.Precision(task="multiclass", average='micro', num_classes=num_classes)
        
        self.train_recall = torchmetrics.Recall(task="multiclass", average='micro', num_classes=num_classes)
#         self.val_recall = torchmetrics.Recall(task="multiclass", average='micro', num_classes=num_classes)
        self.test_recall = torchmetrics.Recall(task="multiclass", average='micro', num_classes=num_classes)
        
        self.train_f1 = torchmetrics.F1Score(task="multiclass", num_classes=num_classes)
#         self.val_f1 = torchmetrics.F1Score(task="multiclass", num_classes=num_classes)
        self.test_f1 = torchmetrics.F1Score(task="multiclass", num_classes=num_classes)
        

    def forward(self, x):
        return self.model(x)

    def training_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = self.criterion(outputs, labels)
        preds = torch.argmax(outputs, dim=1)
        acc = self.train_acc(preds, labels)
        self.log('train_loss', loss, on_step=True, on_epoch=True, prog_bar=True)
        self.log('train_acc', acc, on_step=True, on_epoch=True, prog_bar=True)
        return loss

    def validation_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = self.criterion(outputs, labels)
        preds = torch.argmax(outputs, dim=1)
        acc = self.val_acc(preds, labels)
        self.log('val_loss', loss, on_step=False, on_epoch=True, prog_bar=True)
        self.log('val_acc', acc, on_step=False, on_epoch=True, prog_bar=True)
        return loss

    def test_step(self, batch, batch_idx):
        images, labels = batch
        outputs = self(images)
        loss = self.criterion(outputs, labels)
        preds = torch.argmax(outputs, dim=1)
        acc = self.test_acc(preds, labels)
        # Update metrics
        self.test_precision.update(preds, labels)
        self.test_recall.update(preds, labels)
        self.test_f1.update(preds, labels)
        self.log('test_loss', loss, on_epoch=True, prog_bar=True)
        self.log('test_acc', acc, on_epoch=True, prog_bar=True)
        return loss

    def on_test_epoch_end(self):
        # Log the confusion matrix and other metrics at the end of the test
        self.log('precision', self.test_precision.compute())
        self.log('recall', self.test_recall.compute())
        self.log('f1_score', self.test_f1.compute())
        # Reset metrics
       
        self.test_precision.reset()
        self.test_recall.reset()
        self.test_f1.reset()
        

    def configure_optimizers(self):
        optimizer = optim.Adam(self.parameters(), lr=self.learning_rate)
        lr_scheduler = {
                'scheduler': StepLR(optimizer, step_size=5, gamma=0.1),
                'name': 'learning_rate',
                'interval': 'epoch',  # 'step' para actualizar cada paso, 'epoch' para cada época
                'frequency': 1,  # Frecuencia de actualización
            }
        return [optimizer], [lr_scheduler]
    
    def print_model(self):
        print(self.model)


In [None]:
# fer2013_data_module = FER2013DataModule(train_dir = '/kaggle/input/fer2013/train', test_dir = '/kaggle/input/fer2013/test', batch_size = 32)
affectnet_data_module = YoungAffectNetDataModule(imgs_dir='/kaggle/working/young-affectnet-hq', 
                                                 batch_size = 1)
weights = affectnet_data_module.get_weights()

results = dict()

model_name_list = ['mobilenet_v3_small'] # 'mobilenet_v3_large', 'resnet50', 'mobilenet_v3_small', 'efficientnet_v2_s']
for model_name in model_name_list:
    
    
    model = FaceEmotionClassifier(model_name = model_name, num_classes = 7, learning_rate=0.0001, pretrained=True, weights = weights)
#     model.print_model()
    early_stopping = EarlyStopping(
        monitor='val_loss',  # Métrica a monitorear
        patience=5,  # Número de épocas sin mejora después de las cuales se detendrá el entrenamiento
        verbose=True,  # Para loggear cuando se detiene el entrenamiento
        mode='min',  # 'min' si la métrica debe disminuir (para pérdida), 'max' para métricas como la precisión
    )

    trainer = Trainer(min_epochs = 2,  
                      max_epochs = 10, 
                      devices=1, 
                      accelerator='gpu', 
                      callbacks = [early_stopping])#'gpu'
    print(f'Training {model_name}...')
    # Train the model
    trainer.fit(model, datamodule=affectnet_data_module)
    print(f'Testing {model_name}...')
    # Evaluate the model on the test set
    trainer.test(model, datamodule=affectnet_data_module)

    
    
    
    # Save the fine-tuned model
# trainer.save_checkpoint("/kaggle/working/fine_tuned_mobilenet_v3_small.ckpt")

In [None]:
print("Results obtained")
## FER2013
# Mobilenet V3 Large:

┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃        Test metric        ┃       DataLoader 0        ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│         f1_score          │    0.6054611206054688     │
│         precision         │    0.6054611206054688     │
│          recall           │    0.6054611206054688     │
│         test_acc          │    0.6054611206054688     │
│         test_loss         │    1.3348348140716553     │
└───────────────────────────┴───────────────────────────┘

# Mobilenet V3 Small:
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃        Test metric        ┃       DataLoader 0        ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│         f1_score          │    0.5653385519981384     │
│         precision         │    0.5653385519981384     │
│          recall           │    0.5653385519981384     │
│         test_acc          │    0.5653385519981384     │
│         test_loss         │    1.2770402431488037     │
└───────────────────────────┴───────────────────────────┘

# Resnet 50:
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃        Test metric        ┃       DataLoader 0        ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│         f1_score          │    0.6518529057502747     │
│         precision         │    0.6518529057502747     │
│          recall           │    0.6518529057502747     │
│         test_acc          │    0.6518529057502747     │
│         test_loss         │     1.244256615638733     │
└───────────────────────────┴───────────────────────────┘

# Efficientnet V2 S:
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃        Test metric        ┃       DataLoader 0        ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│         f1_score          │    0.6779047250747681     │
│         precision         │    0.6779047250747681     │
│          recall           │    0.6779047250747681     │
│         test_acc          │    0.6779047250747681     │
│         test_loss         │     1.244596004486084     │
└───────────────────────────┴───────────────────────────┘

## AffectNet
# Mobilenet V3 Large:
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃        Test metric        ┃       DataLoader 0        ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│         f1_score          │    0.6053042411804199     │
│         precision         │    0.6053042411804199     │
│          recall           │    0.6053042411804199     │
│         test_acc          │    0.6053042411804199     │
│         test_loss         │     1.154691219329834     │
└───────────────────────────┴───────────────────────────┘

# Mobilenet V3 Small:
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃        Test metric        ┃       DataLoader 0        ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│         f1_score          │    0.5811232328414917     │
│         precision         │    0.5811232328414917     │
│          recall           │    0.5811232328414917     │
│         test_acc          │    0.5811232328414917     │
│         test_loss         │    1.0759913921356201     │
└───────────────────────────┴───────────────────────────┘

# Resnet 50:
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃        Test metric        ┃       DataLoader 0        ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│         f1_score          │    0.6560062170028687     │
│         precision         │    0.6560062170028687     │
│          recall           │    0.6560062170028687     │
│         test_acc          │    0.6560062170028687     │
│         test_loss         │    1.2396374940872192     │
└───────────────────────────┴───────────────────────────┘

# Efficientnet V2 S:
┏━━━━━━━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━━━━━━━━━┓
┃        Test metric        ┃       DataLoader 0        ┃
┡━━━━━━━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━━━━━━━━━┩
│         f1_score          │    0.6779047250747681     │
│         precision         │    0.6779047250747681     │
│          recall           │    0.6779047250747681     │
│         test_acc          │    0.6779047250747681     │
│         test_loss         │     1.244596004486084     │
└───────────────────────────┴───────────────────────────┘