Creating a CNN for Arousal Prediction

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader, Subset
import pytorch_lightning as pl
import torch.nn as nn
import torch.optim as optim
from pytorch_lightning import Trainer
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import torch
import torch.nn as nn
import torch.optim as optim
import pytorch_lightning as pl
import torchmetrics



## Custom Dataset

In [None]:
class TensorDataset(Dataset):
    def __init__(self, data_dir, max_frames=115):
        self.data_dir = data_dir
        self.max_frames = max_frames

        # Ottieni tutti i file di tensori
        all_tensor_files = [f for f in os.listdir(data_dir) if f.endswith('.npy')]

        # Filtra i file con abbastanza frame
        self.tensor_files = []
        for file_name in all_tensor_files:
            file_path = os.path.join(data_dir, file_name)
            tensor = np.load(file_path, mmap_mode='r')  # Usa mmap per migliorare la velocità
            if tensor.shape[0] >= self.max_frames:
                self.tensor_files.append(file_name)

        print(f"Loaded {len(self.tensor_files)} of {len(all_tensor_files)} files from {data_dir} (filtered out {len(all_tensor_files) - len(self.tensor_files)} files)")

    def __len__(self):
        return len(self.tensor_files)

    def __getitem__(self, idx):
        file_name = self.tensor_files[idx]
        file_path = os.path.join(self.data_dir, file_name)

        # Carica il tensore
        tensor = np.load(file_path, mmap_mode='r')
        tensor = torch.tensor(tensor, dtype=torch.float32)

        # Trimma se necessario
        if tensor.shape[0] > self.max_frames:
            tensor = tensor[:self.max_frames]    

        # Estrai etichette dal nome file
        file_parts = file_name.split('_')
        arousal = float(file_parts[-4])
        arousal = 0 if arousal < 5 else 1
        labels = torch.tensor([arousal], dtype=torch.float32)

        return tensor, labels, file_name


# Definition of the CNN

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import pytorch_lightning as pl

class EmotionCNN(pl.LightningModule):
    def __init__(self, input_shape):
        """
        input_shape: torch.Size([131, 478, 3])
            - 131: altezza (o numero di frame)
            - 478: larghezza (numero di landmark)
            - 3: canali (coordinate x, y, z)
        """
        super().__init__()
        print(f"Model initialized with input_shape: {input_shape}")

        # Impostazioni di base
        self.height = input_shape[0]   # 131
        self.width = input_shape[1]    # 478
        self.channels = input_shape[2] # 3

        # Layer convoluzionali
        self.conv1 = nn.Conv2d(in_channels=self.channels, out_channels=32, kernel_size=3, padding=1)
        self.conv2 = nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, padding=1)
        self.conv3 = nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, padding=1)
        self.pool = nn.MaxPool2d(2, 2)

        # Layer Fully Connected (inizializzato con dimensioni fisse, corretto dopo)
        self.fc1 = nn.Linear(1, 512)  # Placeholder, sarà corretto nel forward
        self.fc2 = nn.Linear(512, 256)
        self.fc_out = nn.Linear(256, 1)

        self.relu = nn.ReLU()
        self.loss_fn = nn.BCEWithLogitsLoss()

    def forward(self, x):
        batch_size = x.size(0)
        device = x.device  # Assicura che sia su GPU o CPU in modo corretto

        # Cambiamo l'ordine delle dimensioni: [batch, channels, height, width]
        x = x.permute(0, 3, 1, 2).contiguous()

        # Passaggio attraverso i layer convoluzionali
        x = self.pool(self.relu(self.conv1(x)))
        x = self.pool(self.relu(self.conv2(x)))
        x = self.pool(self.relu(self.conv3(x)))

        
        fc_input_dim = x.shape[1] * x.shape[2] * x.shape[3]

        if self.fc1.in_features != fc_input_dim:
            self.fc1 = nn.Linear(fc_input_dim, 512).to(device)

        # Flatten
        x = x.view(batch_size, -1)

        # Layer Fully Connected
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc_out(x)

        return x

    def training_step(self, batch, batch_idx):
        x, y, _ = batch

        # Assicuriamoci che input e target siano sullo stesso device
        device = self.device
        x, y = x.to(device), y.to(device)

        # Assicuriamoci che y sia float per la BCEWithLogitsLoss
        y = y.float()

        y_pred = self(x)
        loss = self.loss_fn(y_pred, y)
        self.log("train_loss", loss)
        return loss

    def configure_optimizers(self):
        return optim.Adam(self.parameters(), lr=0.001)


# Leave-One-Out Training

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import DataLoader, ConcatDataset
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

def leave_one_out_training(data_dir, max_epochs=10):
    users = sorted(os.listdir(data_dir))
    all_fold_metrics = []

    for i, user in enumerate(users):
        print(f"\n=== Leave-One-Out Fold {i+1}/{len(users)} ===")
        test_user_path = os.path.join(data_dir, user, 'tensors')
        train_users = [u for u in users if u != user]

        train_datasets = []
        for train_user in train_users:
            user_tensors_path = os.path.join(data_dir, train_user, 'tensors')
            if os.path.exists(user_tensors_path):
                dataset = TensorDataset(user_tensors_path)
                if len(dataset) > 0:
                    train_datasets.append(dataset)
                else:
                    print(f"Warning: Nessun campione valido per {train_user}")

        if not train_datasets:
            print(f"Skipping fold {i+1} - no training samples found")
            continue

        train_dataset = ConcatDataset(train_datasets)
        test_dataset = TensorDataset(test_user_path)
        if len(test_dataset) == 0:
            print(f"Skipping fold {i+1} - no test samples found for user {user}")
            continue

        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, drop_last=True, num_workers=0)
        test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=0)

        sample, _, _ = train_datasets[0][0]
        input_shape = sample.shape
        print(f"Sample shape: {input_shape}")

        model = EmotionCNN(input_shape)
    
        trainer = Trainer(
            accelerator="auto",
            devices=1,
            max_epochs=max_epochs,
            logger=True,
            enable_checkpointing=True,
            callbacks=[pl.callbacks.EarlyStopping(monitor='train_loss', patience=3)],
            enable_progress_bar=True
        )

        trainer.fit(model, train_loader) 

        model.eval()
        all_preds, all_labels = [], []
        with torch.no_grad():
            for batch_idx, (test_sample, test_label, test_file) in enumerate(test_loader):
                print(next(model.parameters()).device)  # Dovrebbe stampare cuda:0
                print(test_sample.device)  # Dovrebbe stampare cuda:0

                print(f"Batch {batch_idx}: test_sample shape {test_sample.shape}")

                if test_sample.nelement() == 0:
                    print(f"Warning: test_sample è vuoto! Salto questo batch.")
                    continue
                
                if test_sample.dim() == 3:
                    test_sample = test_sample.unsqueeze(0).permute(0, 3, 1, 2).contiguous()
                
                test_pred = model(test_sample)

                all_preds.append(test_pred.cpu())
                all_labels.append(test_label.cpu())
                print(f"Test file: {test_file[0]}")
                print(f"Prediction: {test_pred.cpu().numpy()}, True label: {test_label.cpu().numpy()}")

        if all_preds and all_labels:
            all_preds = torch.cat(all_preds, dim=0).numpy()
            all_labels = torch.cat(all_labels, dim=0).numpy()

            mse = mean_squared_error(all_preds, all_labels)
            mae = mean_absolute_error(all_preds, all_labels)
            r2 = r2_score(all_labels, all_preds)

            print(f"Metrics for Fold {i+1}:")
            print(f"  MSE: {mse:.4f}")
            print(f"  MAE: {mae:.4f}")
            print(f"  R² Score: {r2:.4f}")

            all_fold_metrics.append({'fold': i+1, 'user': user, 'mse': mse, 'mae': mae, 'r2': r2})

    if all_fold_metrics:
        avg_mse = sum(fold['mse'] for fold in all_fold_metrics) / len(all_fold_metrics)
        avg_mae = sum(fold['mae'] for fold in all_fold_metrics) / len(all_fold_metrics)
        avg_r2 = sum(fold['r2'] for fold in all_fold_metrics) / len(all_fold_metrics)

        print("\n=== Overall Cross-Validation Results ===")
        print(f"Average MSE: {avg_mse:.4f}")
        print(f"Average MAE: {avg_mae:.4f}")
        print(f"Average R² Score: {avg_r2:.4f}")

    return all_fold_metrics


# Test another CNN

In [None]:
import os
import numpy as np
import torch
from torch.utils.data import DataLoader, ConcatDataset
import pytorch_lightning as pl
from pytorch_lightning import Trainer
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score

def leave_one_out_training(data_dir, max_epochs=10):
    torch.cuda.empty_cache()
    
    users = sorted(os.listdir(data_dir))
    all_fold_metrics = []

    for i, user in enumerate(users):
        print(f"\n=== Leave-One-Out Fold {i+1}/{len(users)} ===")
        test_user_path = os.path.join(data_dir, user, 'tensors')
        train_users = [u for u in users if u != user]

        train_datasets = []
        for train_user in train_users:
            user_tensors_path = os.path.join(data_dir, train_user, 'tensors')
            if os.path.exists(user_tensors_path):
                dataset = TensorDataset(user_tensors_path)
                if len(dataset) > 0:
                    train_datasets.append(dataset)
                else:
                    print(f"Warning: Nessun campione valido per {train_user}")

        if not train_datasets:
            print(f"Skipping fold {i+1} - no training samples found")
            continue

        train_dataset = ConcatDataset(train_datasets)
        test_dataset = TensorDataset(test_user_path)
        if len(test_dataset) == 0:
            print(f"Skipping fold {i+1} - no test samples found for user {user}")
            continue

        train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, drop_last=True, num_workers=0)
        test_loader = DataLoader(test_dataset, batch_size=1, shuffle=False, num_workers=0)

        model = Model()
    
        trainer = Trainer(
            accelerator="auto",
            devices=1,
            max_epochs=max_epochs,
            logger=True,
            enable_checkpointing=True,
            callbacks=[pl.callbacks.EarlyStopping(monitor='train_loss', patience=3)],
            enable_progress_bar=True
        )

        trainer.fit(model, train_loader) 

        model.eval()
        all_preds, all_labels = [], []
        with torch.no_grad():
            for batch_idx, (test_sample, test_label, test_file) in enumerate(test_loader):
                print(next(model.parameters()).device)  # Dovrebbe stampare cuda:0
                print(test_sample.device)  # Dovrebbe stampare cuda:0

                print(f"Batch {batch_idx}: test_sample shape {test_sample.shape}")

                if test_sample.nelement() == 0:
                    print(f"Warning: test_sample è vuoto! Salto questo batch.")
                    continue
                
                if test_sample.dim() == 3:
                    test_sample = test_sample.unsqueeze(0).permute(0, 3, 1, 2).contiguous()
                
                test_pred = model(test_sample)

                all_preds.append(test_pred.cpu())
                all_labels.append(test_label.cpu())
                print(f"Test file: {test_file[0]}")
                print(f"Prediction: {test_pred.cpu().numpy()}, True label: {test_label.cpu().numpy()}")

        if all_preds and all_labels:
            all_preds = torch.cat(all_preds, dim=0).numpy()
            all_labels = torch.cat(all_labels, dim=0).numpy()

            mse = mean_squared_error(all_preds, all_labels)
            mae = mean_absolute_error(all_preds, all_labels)
            r2 = r2_score(all_labels, all_preds)

            print(f"Metrics for Fold {i+1}:")
            print(f"  MSE: {mse:.4f}")
            print(f"  MAE: {mae:.4f}")
            print(f"  R² Score: {r2:.4f}")

            all_fold_metrics.append({'fold': i+1, 'user': user, 'mse': mse, 'mae': mae, 'r2': r2})

    if all_fold_metrics:
        avg_mse = sum(fold['mse'] for fold in all_fold_metrics) / len(all_fold_metrics)
        avg_mae = sum(fold['mae'] for fold in all_fold_metrics) / len(all_fold_metrics)
        avg_r2 = sum(fold['r2'] for fold in all_fold_metrics) / len(all_fold_metrics)

        print("\n=== Overall Cross-Validation Results ===")
        print(f"Average MSE: {avg_mse:.4f}")
        print(f"Average MAE: {avg_mae:.4f}")
        print(f"Average R² Score: {avg_r2:.4f}")

    return all_fold_metrics


# Start the Leave-One-Out Training

In [None]:
data_dir = "data"
leave_one_out_training(data_dir, max_epochs=10)

In [None]:
import os
import numpy as np
import pandas as pd

def analyze_tensor_dimensions_for_all_users(base_dir, output_csv='tensor_dimensions_all_users.csv'):
    tensor_data = []

    # Itera su tutti gli utenti
    for user in os.listdir(base_dir):
        user_dir = os.path.join(base_dir, user)
        if not os.path.isdir(user_dir):  # Ignora se non è una cartella
            continue
        
        # Trova la cartella "tensors" all'interno di ogni utente
        tensors_dir = os.path.join(user_dir, 'tensors')
        if not os.path.isdir(tensors_dir):
            continue

        # Lista tutti i file .npy
        tensor_files = [f for f in os.listdir(tensors_dir) if f.endswith('.npy')]

        for file_name in tensor_files:
            file_path = os.path.join(tensors_dir, file_name)
            
            # Carica il tensore
            tensor = np.load(file_path)
            
            # Ottieni la forma del tensore (numero di frame, landmarks, 3D)
            tensor_shape = tensor.shape
            
            # Aggiungi i dati al dataframe
            tensor_data.append({
                'user': user,
                'file_name': file_name,
                'num_frames': tensor_shape[0],
                'num_landmarks': tensor_shape[1],
                'num_coordinates': tensor_shape[2]
            })
            
            print(f"User: {user}, File: {file_name}, Shape: {tensor_shape}")

    # Crea un DataFrame con i dati raccolti
    df = pd.DataFrame(tensor_data)
    
    # Salva il DataFrame in un file CSV
    df.to_csv(output_csv, index=False)
    
    print(f"\n--- Dati salvati in {output_csv} ---")
    print(df.head())

# Esegui l'analisi su tutti gli utenti
base_dir = "data"
analyze_tensor_dimensions_for_all_users(base_dir)


In [None]:
#analyze the balance of the dataset
import matplotlib.pyplot as plt
import pandas as pd

def analyze_dataset_balance(data_dir):
    tensor_data = []

    # Itera su tutti gli utenti
    for user in os.listdir(data_dir):
        user_dir = os.path.join(data_dir, user)
        if not os.path.isdir(user_dir):  # Ignora se non è una cartella
            continue
        
        # Trova la cartella "tensors" all'interno di ogni utente
        tensors_dir = os.path.join(user_dir, 'tensors')
        if not os.path.isdir(tensors_dir):
            continue

        for f in os.listdir(tensors_dir):
            if f.endswith('.npy'):
                tensor_data.append({
                    'user': user,
                    'file_name': f,
                    'arousal': 1 if float(f.split('_')[-4]) > 5 else 0
                })
                
    # Crea un DataFrame con i dati raccolti
    df = pd.DataFrame(tensor_data)
    
    # Conta i valori unici
    counts = df['arousal'].value_counts()
    print(counts)

analyze_dataset_balance("data")    
    
    