https://paperswithcode.com/paper/facial-emotion-recognition-state-of-the-art

In [8]:
import numpy as np
import torch
from PIL import Image
from torch.utils.data import Dataset


class CustomDataset(Dataset):
    def __init__(self, images, labels, transform=None, augment=True):
        self.images = images
        self.labels = labels
        self.transform = transform

        self.augment = augment

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        if torch.is_tensor(idx):
            idx = idx.tolist()

        img = np.array(self.images[idx])

        img = Image.fromarray(img)

        if self.transform:
            img = self.transform(img)

        label = torch.tensor(self.labels[idx]).type(torch.long)
        sample = (img, label)

        return sample

In [16]:
import numpy as np
import pandas as pd
import torch
import torchvision.transforms as transforms
from torch.utils.data import DataLoader


def load_data(path='datasets/fer2013/fer2013.csv'):
    fer2013 = pd.read_csv(path)
    fer2013.columns = ['emotion', 'Usage', 'pixels']
    emotion_mapping = {0: 'Angry', 1: 'Disgust', 2: 'Fear', 3: 'Happy', 4: 'Sad', 5: 'Surprise', 6: 'Neutral'}

    return fer2013, emotion_mapping


def prepare_data(data):
    """ Prepare data for modeling
        input: data frame with labels und pixel data
        output: image and label array """

    image_array = np.zeros(shape=(len(data), 48, 48))
    image_label = np.array(list(map(int, data['emotion'])))

    for i, row in enumerate(data.index):
        image = np.fromstring(data.loc[row, 'pixels'], dtype=int, sep=' ')
        image = np.reshape(image, (48, 48))
        image_array[i] = image

    return image_array, image_label


def get_dataloaders(path=r'C:\Users\marco\OneDrive\Documenti\CV_project\ComputerVisionProject\Data\icml_face_data.csv', bs=64, augment=True):
    """ Prepare train, val, & test dataloaders
        Augment training data using:
            - cropping
            - shifting (vertical/horizental)
            - horizental flipping
            - rotation

        input: path to fer2013 csv file
        output: (Dataloader, Dataloader, Dataloader) """

    fer2013, emotion_mapping = load_data(path)

    xtrain, ytrain = prepare_data(fer2013[fer2013['Usage'] == 'Training'])
    xval, yval = prepare_data(fer2013[fer2013['Usage'] == 'PrivateTest'])
    xtest, ytest = prepare_data(fer2013[fer2013['Usage'] == 'PublicTest'])

    mu, st = 0, 255

    '''
    test_transform = transforms.Compose([
        # transforms.Scale(52),
        transforms.TenCrop(40),
        transforms.Lambda(lambda crops: torch.stack([transforms.ToTensor()(crop) for crop in crops])),
        transforms.Lambda(lambda tensors: torch.stack([transforms.Normalize(mean=(mu,), std=(st,))(t) for t in tensors])),
    ])
    '''
    test_transform = transforms.Compose([
    transforms.TenCrop(40),
    transforms.Lambda(lambda crops: torch.stack([transforms.ToTensor()(crop) for crop in crops])),
    transforms.Lambda(lambda tensors: torch.stack([transforms.Normalize(mean=(mu,), std=(st,))(t) for t in tensors])),
    transforms.Lambda(lambda tensors: torch.stack([transforms.Resize((48, 48))(t) for t in tensors])),
    ])


    if augment:
        '''
        train_transform = transforms.Compose([
            transforms.RandomResizedCrop(48, scale=(0.8, 1.2)),
            transforms.RandomApply([transforms.RandomAffine(0, translate=(0.2, 0.2))], p=0.5),
            transforms.RandomHorizontalFlip(),
            transforms.RandomApply([transforms.RandomRotation(10)], p=0.5),

            transforms.TenCrop(40),
            transforms.Lambda(lambda crops: torch.stack([transforms.ToTensor()(crop) for crop in crops])),
            transforms.Lambda(lambda tensors: torch.stack([transforms.Normalize(mean=(mu,), std=(st,))(t) for t in tensors])),
            transforms.Lambda(lambda tensors: torch.stack([transforms.RandomErasing(p=0.5)(t) for t in tensors])),
        ])
        '''
        train_transform = transforms.Compose([
        transforms.RandomResizedCrop(48, scale=(0.8, 1.2)),
        transforms.RandomApply([transforms.RandomAffine(0, translate=(0.2, 0.2))], p=0.5),
        transforms.RandomHorizontalFlip(),
        transforms.RandomApply([transforms.RandomRotation(10)], p=0.5),
        transforms.TenCrop(40),
        transforms.Lambda(lambda crops: torch.stack([transforms.ToTensor()(crop) for crop in crops])),
        transforms.Lambda(lambda tensors: torch.stack([transforms.Normalize(mean=(mu,), std=(st,))(t) for t in tensors])),
        transforms.Lambda(lambda tensors: torch.stack([transforms.RandomErasing(p=0.5)(t) for t in tensors])),
        transforms.Lambda(lambda tensors: torch.stack([transforms.Resize((48, 48))(t) for t in tensors])),
    ])


    else:
        train_transform = test_transform



    # X = np.vstack((xtrain, xval))
    # Y = np.hstack((ytrain, yval))

    train = CustomDataset(xtrain, ytrain, transform=train_transform)
    val = CustomDataset(xval, yval, transform=test_transform)  # Usa test_transform per il set di test
    test = CustomDataset(xtest, ytest, transform=test_transform)  # Usa test_transform per il set di test


    trainloader = DataLoader(train, batch_size=64, shuffle=True)
    valloader = DataLoader(val, batch_size=64, shuffle=True)
    testloader = DataLoader(test, batch_size=64, shuffle=True)

    return trainloader, valloader, testloader

In [17]:
trainloader, valloader, testloader = get_dataloaders()

In [18]:
print(trainloader.dataset[0][0].shape)
print(valloader.dataset[0][0].shape)
print(testloader.dataset[0][0].shape)

torch.Size([10, 1, 48, 48])
torch.Size([10, 1, 48, 48])
torch.Size([10, 1, 48, 48])




In [19]:
len(trainloader), len(valloader), len(testloader)

(449, 57, 57)

In [88]:
import torch.nn as nn
import torch.nn.functional as F
import torchvision


class VggFeatures(nn.Module):
    def __init__(self, drop=0.2):
        super().__init__()

        self.conv1a = nn.Conv2d(in_channels=1, out_channels=64, kernel_size=3, padding=1)
        self.conv1b = nn.Conv2d(64, out_channels=64, kernel_size=3, padding=1)

        self.conv2a = nn.Conv2d(64, 128, 3, padding=1)
        self.conv2b = nn.Conv2d(128, 128, 3, padding=1)

        self.conv3a = nn.Conv2d(128, 256, 3, padding=1)
        self.conv3b = nn.Conv2d(256, 256, 3, padding=1)

        self.conv4a = nn.Conv2d(256, 512, 3, padding=1)
        self.conv4b = nn.Conv2d(512, 512, 3, padding=1)

        self.pool = nn.MaxPool2d(kernel_size=2, stride=2)

        self.bn1a = nn.BatchNorm2d(64)
        self.bn1b = nn.BatchNorm2d(64)

        self.bn2a = nn.BatchNorm2d(128)
        self.bn2b = nn.BatchNorm2d(128)

        self.bn3a = nn.BatchNorm2d(256)
        self.bn3b = nn.BatchNorm2d(256)

        self.bn4a = nn.BatchNorm2d(512)
        self.bn4b = nn.BatchNorm2d(512)

        self.lin1 = nn.Linear(512 * 2 * 2, 4096)
        self.lin2 = nn.Linear(4096, 4096)

        self.drop = nn.Dropout(p=drop)

    def forward(self, x):
        x = F.relu(self.bn1a(self.conv1a(x)))
        x = F.relu(self.bn1b(self.conv1b(x)))
        x = self.pool(x)

        x = F.relu(self.bn2a(self.conv2a(x)))
        x = F.relu(self.bn2b(self.conv2b(x)))
        x = self.pool(x)

        x = F.relu(self.bn3a(self.conv3a(x)))
        x = F.relu(self.bn3b(self.conv3b(x)))
        x = self.pool(x)

        x = F.relu(self.bn4a(self.conv4a(x)))
        x = F.relu(self.bn4b(self.conv4b(x)))
        x = self.pool(x)
        # print(x.shape)

        x = x.view(-1, 512 * 2 * 2)
        x = F.relu(self.drop(self.lin1(x)))
        x = F.relu(self.drop(self.lin2(x)))

        return x


class Vgg(VggFeatures):
    def __init__(self, drop=0.2):
        super().__init__(drop)
        self.lin3 = nn.Linear(4096, 7)

    def forward(self, x):
        x = super().forward(x)
        x = self.lin3(x)
        return x

In [24]:
##### GPT ######

import torch
import torch.nn as nn

class VGGNet(nn.Module):
    def __init__(self, num_classes=7):
        super(VGGNet, self).__init__()
        self.features = nn.Sequential(
            nn.Conv2d(1, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(256, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),

            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.Conv2d(512, 512, kernel_size=3, padding=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )
        self.classifier = nn.Sequential(
            nn.Linear(512 * 6 * 6, 4096),  # Adatta questa dimensione in base alle dimensioni del tuo output dalla parte features
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, 4096),
            nn.ReLU(inplace=True),
            nn.Dropout(),
            nn.Linear(4096, num_classes),
        )

    def forward(self, x):
        # Se l'input ha dimensione 5D, ridimensiona prima di passare attraverso la parte convoluzionale
        if x.dim() == 5:
            batch_size, num_crops, channels, height, width = x.size()
            x = x.view(-1, channels, height, width)

        x = self.features(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)

        # Se l'input ha dimensione 5D, ridimensiona l'output finale
        if batch_size > 1 and num_crops > 1:
            x = x.view(batch_size, num_crops, -1).mean(1)  # Media sulle dimensioni dei crop

        return x


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
from sklearn.metrics import accuracy_score, precision_score, recall_score
from tqdm import tqdm

# Impostazioni
learning_rate = 0.01
n_epochs = 10

# Modello
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
net = Vgg(drop=0.2)
net.to(device)

# Ottimizzatore e criterio
optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9, nesterov=True, weight_decay=0.0001)
criterion = nn.CrossEntropyLoss()

# Scheduler
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.75, patience=5, verbose=True)

# Training loop
best_accuracy = 0.0
for epoch in range(n_epochs):
    net.train()
    running_loss = 0.0
    all_labels = []
    all_predictions = []

    for inputs, labels in tqdm(trainloader, desc=f'Epoch {epoch + 1}/{n_epochs}', leave=False):
        inputs, labels = inputs.to(device), labels.to(device)

        optimizer.zero_grad()

        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        _, predictions = torch.max(outputs, 1)
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predictions.cpu().numpy())

    # Calcolo delle metriche di allenamento
    accuracy = accuracy_score(all_labels, all_predictions)
    precision = precision_score(all_labels, all_predictions, average='macro')
    recall = recall_score(all_labels, all_predictions, average='macro')

    # Calcolo delle metriche di validazione
    net.eval()
    val_labels = []
    val_predictions = []
    val_loss = 0.0

    with torch.no_grad():
        for inputs, labels in valloader:
            inputs, labels = inputs.to(device), labels.to(device)

            outputs = net(inputs)
            val_loss += criterion(outputs, labels).item()

            _, predictions = torch.max(outputs, 1)
            val_labels.extend(labels.cpu().numpy())
            val_predictions.extend(predictions.cpu().numpy())

    val_accuracy = accuracy_score(val_labels, val_predictions)
    val_precision = precision_score(val_labels, val_predictions, average='macro')
    val_recall = recall_score(val_labels, val_predictions, average='macro')

    # Aggiornamento dello scheduler
    scheduler.step(val_accuracy)

    # Salvataggio del modello se l'accuracy di validazione è migliorata
    if val_accuracy > best_accuracy:
        best_accuracy = val_accuracy
        torch.save(net.state_dict(), r'C:\Users\marco\OneDrive\Documenti\CV_project\ComputerVisionProject\Models\paper2_models\model0.pth')

    # Stampa delle metriche
    print(f"Epoch {epoch + 1}/{n_epochs} - Loss: {running_loss / len(trainloader):.4f} - "
          f"Accuracy: {accuracy:.4f} - Precision: {precision:.4f} - Recall: {recall:.4f} - "
          f"Val Loss: {val_loss / len(valloader):.4f} - "
          f"Val Accuracy: {val_accuracy:.4f} - Val Precision: {val_precision:.4f} - Val Recall: {val_recall:.4f}")

In [25]:
import torch
import torch.optim as optim
from torch.optim.lr_scheduler import ReduceLROnPlateau
import torch.nn.functional as F
from sklearn.metrics import accuracy_score, precision_score, recall_score
from tqdm import tqdm

model = VGGNet()

# Funzioni per calcolare accuracy, precision e recall
def calculate_metrics(predictions, labels):
    accuracy = accuracy_score(labels, predictions)
    precision = precision_score(labels, predictions, average=None)
    recall = recall_score(labels, predictions, average=None)
    return accuracy, precision, recall

# Funzione per l'allenamento del modello
def train(model, trainloader, valloader, epochs, learning_rate, optimizer, scheduler):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    
    criterion = nn.CrossEntropyLoss()

    for epoch in range(epochs):
        model.train()
        running_loss = 0.0
        predictions_list = []
        labels_list = []

        for inputs, labels in tqdm(trainloader, desc=f'Epoch {epoch + 1}/{epochs} - Training'):
            inputs, labels = inputs.to(device), labels.to(device)

            optimizer.zero_grad()

            outputs = model(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            running_loss += loss.item()

            _, predictions = torch.max(outputs, 1)
            predictions_list.extend(predictions.cpu().numpy())
            labels_list.extend(labels.cpu().numpy())

        # Calcola e stampa le metriche di allenamento
        accuracy_train, precision_train, recall_train = calculate_metrics(predictions_list, labels_list)
        print(f'Training Accuracy: {accuracy_train:.4f}')
        print(f'Training Precision: {precision_train}')
        print(f'Training Recall: {recall_train}')

        # Valutazione sul set di validazione
        model.eval()
        predictions_list = []
        labels_list = []

        with torch.no_grad():
            for inputs, labels in tqdm(valloader, desc=f'Epoch {epoch + 1}/{epochs} - Validation'):
                inputs, labels = inputs.to(device), labels.to(device)

                outputs = model(inputs)
                _, predictions = torch.max(outputs, 1)
                predictions_list.extend(predictions.cpu().numpy())
                labels_list.extend(labels.cpu().numpy())

        # Calcola e stampa le metriche di validazione
        accuracy_val, precision_val, recall_val = calculate_metrics(predictions_list, labels_list)
        print(f'Validation Accuracy: {accuracy_val:.4f}')
        print(f'Validation Precision: {precision_val}')
        print(f'Validation Recall: {recall_val}')

        # Aggiorna il rate del learning scheduler
        scheduler.step(accuracy_val)

    print("Training complete.")

# Parametri di allenamento
learning_rate = 0.01
epochs = 3
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=0.9, nesterov=True, weight_decay=0.0001)
scheduler = ReduceLROnPlateau(optimizer, mode='max', factor=0.75, patience=5, verbose=True)

# Chiama la funzione di allenamento
train(model, trainloader, valloader, epochs, learning_rate, optimizer, scheduler)

Epoch 1/3 - Training:   0%|          | 0/449 [00:05<?, ?it/s]


RuntimeError: mat1 and mat2 shapes cannot be multiplied (640x512 and 18432x4096)