In [1]:
import os
import sys
import time
import h5py
from typing import List, Tuple
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from matplotlib_inline.backend_inline import set_matplotlib_formats
import seaborn as sns
import torch
import torch.nn as nn
from torch.optim import Optimizer
import torchvision.models as models
import torchvision.transforms as transforms
from torch.utils.data import DataLoader, Dataset, random_split, Subset
from sklearn.metrics import accuracy_score, f1_score, recall_score, precision_score, confusion_matrix, classification_report
from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset
import torch.nn.functional as F

set_matplotlib_formats("svg")
plt.style.use('ggplot')
plt.rcParams['figure.figsize'] = (10, 5)


# VIT
from transformers import ViTForImageClassification, ViTModel, AutoConfig, ViTConfig


In [2]:
config = {
    'hdf5_path': "../../data/fer_2013_processed.h5",
    'fer2013_resnet18_embeddings_path': "../../data/fer2013_resnet18_embeddings.npy",
    'fer2013_resnet18_labels_path': "../../data/fer2013_resnet18_labels.npy",
    'model_name': "resnet18",
#     'model_name': "efficientnet_b0",
#     'model_name': "vit_transformer",
    'batch_size': 32,
    'learning_rate': 0.0001,
    'num_epochs': 30,
    'device': torch.device("cuda:0" if torch.cuda.is_available() else "cpu"),
    'emotion_labels': ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral'],
    'L2_regularization': 1e-4,
    'unfreeze_layers': ['layer4', 'fc'],
    
    # Add other configurations and hyperparameters here
}

labels = np.load(config['fer2013_resnet18_labels_path'])
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(labels), y=labels)
config["class_weights"] = torch.tensor(class_weights, dtype=torch.float).to(config['device'])

In [26]:
class CustomDataset(Dataset):
    def __init__(self, hdf5_filename, transform=None):
        
        self.file = h5py.File(hdf5_filename, 'r')
        self.images = self.file['images']
        self.labels = self.file['labels']
        
        # we can add data augmentations (overfitting here)
        self.transform = transform or transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((224, 224)),
            transforms.Grayscale(num_output_channels=3),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
        self.emotion_labels = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']


    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = torch.tensor(self.images[idx], dtype=torch.float32)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        
        if self.transform:
            image = self.transform(image)
            
        return image, label

    def close(self):
        self.file.close()

In [27]:
def create_dataloaders(data, labels=None, train_size: float = 0.7, random_state: int = 42, use_embeddings: bool = False):
    """
    DataLoader creation for both precomputed embeddings with labels and PyTorch Datasets.

    Parameters:
    - data: Embeddings as a numpy array or a PyTorch Dataset.
    - labels: Labels as a numpy array (used only if use_embeddings=True).
    - batch_size: Size of each batch for the DataLoader.
    - train_size: Proportion of the dataset to include in the train split.
    - random_state: Seed used by the random number generator for shuffling.
    - use_embeddings: Flag to indicate whether data consists of embeddings and labels (True) or a Dataset (False).

    Returns:
    - A dictionary containing DataLoader objects for 'train', 'val', and 'test' datasets.
    """
    print("creating data loaders")
    
    if use_embeddings:
        X_train, X_temp, y_train, y_temp = train_test_split(data, labels, test_size=1-train_size, random_state=random_state)
        X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=random_state)

        train_dataset = TensorDataset(torch.tensor(X_train, dtype=torch.float), torch.tensor(y_train, dtype=torch.long))
        val_dataset = TensorDataset(torch.tensor(X_val, dtype=torch.float), torch.tensor(y_val, dtype=torch.long))
        test_dataset = TensorDataset(torch.tensor(X_test, dtype=torch.float), torch.tensor(y_test, dtype=torch.long))
    else:
        val_size, test_size = (1 - train_size) * 0.5, (1 - train_size) * 0.5
        train_dataset, val_dataset, test_dataset = random_split(data, [train_size, val_size, test_size])

       
    train_loader = DataLoader(train_dataset, batch_size=config["batch_size"], shuffle=True, num_workers=4)
    val_loader = DataLoader(val_dataset, batch_size=config["batch_size"], shuffle=False, num_workers=4)
    test_loader = DataLoader(test_dataset, batch_size=config["batch_size"], shuffle=False, num_workers=4)

        
    return train_loader, val_loader, test_loader

In [28]:
def initialize_resnet_model(model_name: str = config["model_name"], num_classes: int = 7, unfreeze_layers: list = None, custom_model: nn.Module = None) -> Tuple[nn.Module, nn.Module, Optimizer]:
    """
    Initializes Resnet for training. This can be a pretrained model or a custom model for training on embeddings.

    Args:
        config (dict): Configuration dictionary containing model parameters, device info, etc.
        model_name (str): Name of the pretrained model architecture to use (e.g., 'resnet18'). If None, a custom model must be provided.
        num_classes (int): Number of classes for the output layer.
        unfreeze_layers (list): Specific layers to unfreeze for training (applicable only for pretrained models).
        custom_model (nn.Module): A custom model instance to be used for training on embeddings.

    Returns:
        Tuple[nn.Module, nn.Module, Optimizer]: Initialized model, criterion, and optimizer.
    """
    print("Initializing Resnet Model...")
    
    if custom_model:
        model = custom_model
    else:
        
        model = getattr(models, model_name)(pretrained=True)
    
        # Freeze layers if unfreeze_layers is specified; otherwise, unfreeze all
        if unfreeze_layers:
            for name, param in model.named_parameters():
                param.requires_grad = name in unfreeze_layers or any(ul in name for ul in unfreeze_layers)
        else:
            for param in model.parameters():
                param.requires_grad = True

        num_ftrs = model.fc.in_features
        model.fc = nn.Linear(num_ftrs, num_classes)

    
    model = model.to(config['device'])
    criterion = nn.CrossEntropyLoss(weight=config.get('class_weights'))
    optimizer = torch.optim.Adam(model.parameters(), lr=config['learning_rate'], weight_decay=config.get('L2_regularization', 0))

    return model, criterion, optimizer


In [29]:
def train_and_validate(model: nn.Module, train_loader: DataLoader, val_loader: DataLoader, config: dict, criterion: nn.Module, optimizer: Optimizer, num_epochs: int = config["num_epochs"]) -> Tuple[List[float], List[float], List[float], List[float]]:
    """
    Trains and validates the model.

    Args:
        model (nn.Module): The model to train and validate.
        train_loader (DataLoader): DataLoader for the training data.
        val_loader (DataLoader): DataLoader for the validation data.
        config (dict): Configuration dictionary containing training parameters.

    Returns:
        Tuple[List[float], List[float], List[float], List[float]]: Lists containing training losses, training accuracies, validation losses, and validation accuracies for each epoch.
    """
    print("Training and Validating...")
    
    train_losses, train_accuracies, val_losses, val_accuracies = [], [], [], []
    
    for epoch in range(num_epochs):
        epoch_start_time = time.time()

        model.train()
        total_loss, total_correct = 0, 0

        for images, labels in train_loader:
            images, labels = images.to(config["device"]), labels.to(config["device"])

            optimizer.zero_grad()  
            output = model(images)
    
            if isinstance(output, torch.Tensor):
                logits = output
            else:  # Assuming it's an ImageClassifierOutput or similar
                logits = output.logits
    
            loss = criterion(logits, labels)
            loss.backward()        
            optimizer.step()

            total_loss += loss.item()
            _, predicted = torch.max(logits, 1)
            total_correct += (predicted == labels).sum().item()

        avg_train_loss = total_loss / len(train_loader)
        avg_train_accuracy = total_correct / len(train_loader.dataset)
        train_losses.append(avg_train_loss)
        train_accuracies.append(avg_train_accuracy)

        # Validation
        model.eval()
        total_val_loss, total_val_correct = 0, 0
        with torch.no_grad():
            for images, labels in val_loader:
                images, labels = images.to(config["device"]), labels.to(config["device"])
                output = model(images)
                
                if isinstance(output, torch.Tensor):
                    logits = output
                else:  # Assuming it's an ImageClassifierOutput or similar
                    logits = output.logits
                    
                loss = criterion(logits, labels)
                total_val_loss += loss.item()
                _, predicted = torch.max(logits, 1)
                total_val_correct += (predicted == labels).sum().item()

        avg_val_loss = total_val_loss / len(val_loader)
        avg_val_accuracy = total_val_correct / len(val_loader.dataset)
        val_losses.append(avg_val_loss)
        val_accuracies.append(avg_val_accuracy)

        epoch_end_time = time.time()
        epoch_duration = epoch_end_time - epoch_start_time

        print(f"Epoch: {epoch+1}/{num_epochs}")
        print(f"Training - Loss: {avg_train_loss:.4f}, Accuracy: {avg_train_accuracy:.4f}")
        print(f"Validation - Loss: {avg_val_loss:.4f}, Accuracy: {avg_val_accuracy:.4f}")
        print(f"Time taken: {epoch_duration:.2f} seconds")
        print("-" * 50)
        
    return train_losses, val_losses, train_accuracies, val_accuracies
    


In [30]:
def plot_metrics(train_losses: List[float], val_losses: List[float], train_accuracies: List[float], val_accuracies: List[float]) -> None:
    """
    Plots training and validation losses and accuracies.

    Args:
        train_losses (List[float]): Training losses for each epoch.
        val_losses (List[float]): Validation losses for each epoch.
        train_accuracies (List[float]): Training accuracies for each epoch.
        val_accuracies (List[float]): Validation accuracies for each epoch.
    """
    plt.figure(figsize=(12, 5))

    # Plotting training and validation loss
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Training Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.title('Training and Validation Loss')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    # Plotting training and validation accuracy
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Training Accuracy')
    plt.plot(val_accuracies, label='Validation Accuracy')
    plt.title('Training and Validation Accuracy')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.show()
    return

In [31]:
def evaluate_model(model: nn.Module, data_loader: DataLoader, device: torch.device) -> Tuple[List[int], List[int]]:
    """
    Evaluates the model on a given dataset.

    Args:
        model (nn.Module): The model to evaluate.
        data_loader (DataLoader): DataLoader for the dataset to evaluate.
        device (torch.device): The device to run the evaluation on.

    Returns:
        Tuple[List[int], List[int]]: Lists of true labels and predicted labels.
    """
    print("Evaluating...")
    model.eval()
    all_labels, all_predictions = [], []

    with torch.no_grad():
        for images, labels in data_loader:
            images, labels = images.to(device), labels.to(device)
            output = model(images)
            
            if isinstance(output, torch.Tensor):
                logits = output
            else:  # Assuming it's an ImageClassifierOutput or similar
                logits = output.logits
    
            _, predicted = torch.max(logits, 1)
            all_labels.extend(labels.cpu().numpy().flatten())
            all_predictions.extend(predicted.cpu().numpy().flatten())

    return all_labels, all_predictions


In [32]:
def calculate_and_plot_metrics(all_labels: List[int], all_predictions: List[int]) -> np.ndarray:
    """
    Calculates and plots the evaluation metrics.

    Args:
        all_labels (List[int]): List of true labels.
        all_predictions (List[int]): List of predicted labels.

    Returns:
        None
    """
    test_accuracy = np.sum(np.array(all_labels) == np.array(all_predictions)) / len(all_labels)
    print(f"Total labels: {len(all_labels)}, Total predictions: {len(all_predictions)}")
    print(f"Test Accuracy: {test_accuracy * 100:.2f}%")

    cm = confusion_matrix(all_labels, all_predictions)
    cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    report = classification_report(all_labels, all_predictions, target_names=config["emotion_labels"])
    print(report)
    
    fig, (ax1, ax2) = plt.subplots(ncols=2, figsize=(20, 8))

    sns.heatmap(cm, annot=True, fmt='d', cmap='Blues', xticklabels=config["emotion_labels"], yticklabels=config["emotion_labels"], ax=ax1)
    ax1.set_title('Confusion Matrix')
    ax1.set_xlabel('Predicted')
    ax1.set_ylabel('True')

    sns.heatmap(cm_normalized, annot=True, fmt='.2f', cmap='Blues', xticklabels=config["emotion_labels"], yticklabels=config["emotion_labels"], ax=ax2)
    ax2.set_title('Normalized Confusion Matrix')
    ax2.set_xlabel('Predicted')
    ax2.set_ylabel('True')

    plt.tight_layout()
    plt.show()
    
    return None

In [33]:
class EmotionClassifierNN(nn.Module):
    def __init__(self, input_size=512, num_classes=7, dropout_rate=0.5):
        
        super(EmotionClassifierNN, self).__init__()
        self.fc1 = nn.Linear(input_size, 256)
        self.dropout1 = nn.Dropout(dropout_rate)  # Dropout layer after the first FC layer
        self.fc2 = nn.Linear(256, 128)
        self.dropout2 = nn.Dropout(dropout_rate)  # Dropout layer after the second FC layer
        self.fc3 = nn.Linear(128, num_classes)
        
        # Initialize layers' weights
        torch.nn.init.xavier_uniform_(self.fc1.weight)
        torch.nn.init.xavier_uniform_(self.fc2.weight)
        torch.nn.init.xavier_uniform_(self.fc3.weight)

    def forward(self, x):
        
        x = F.relu(self.fc1(x))
        x = self.dropout1(x)  # Applying dropout after activation
        x = F.relu(self.fc2(x))
        x = self.dropout2(x)  # Applying dropout after activation
        x = self.fc3(x)
        return x

In [34]:
def run_training_and_evaluation(
    custom_dataset: Dataset = None,
    custom_model: nn.Module=None,
    use_embeddings: bool=False,
    unfreeze_layers: list=None,
    num_epochs: int=config["num_epochs"],
):
    """
    Runs the full training and evaluation pipeline without returning any values. 
    Outputs are handled by printing within the function and called sub-functions.

    Args:
        custom_dataset (Dataset, optional): Custome dataset to use instead of saved embeddings
        custom_model (nn.Module, optional): Custom model to use instead of a pretrained one.
        use_embeddings (bool): Flag indicating whether to train on embeddings.
    """
    if use_embeddings:
        embeddings = np.load(config["fer2013_resnet18_embeddings_path"])
        labels = np.load(config["fer2013_resnet18_labels_path"])
        train_loader, val_loader, test_loader = create_dataloaders(data=embeddings, labels=labels, use_embeddings=True)
    else:
        dataset = custom_dataset(config["hdf5_path"])
        train_loader, val_loader, test_loader = create_dataloaders(dataset)

    if custom_model:
        model, criterion, optimizer = initialize_resnet_model(custom_model=custom_model)
    else:
        model, criterion, optimizer = initialize_resnet_model(unfreeze_layers=unfreeze_layers)

    metrics = train_and_validate(model, train_loader, val_loader, config, criterion, optimizer, num_epochs)
    plot_metrics(*metrics)
    
    all_labels, all_predictions = evaluate_model(model, test_loader, config["device"])
    calculate_and_plot_metrics(all_labels, all_predictions)


In [35]:
run_training_and_evaluation(CustomDataset)

creating data loaders
Initializing Resnet Model...




Training and Validating...
Epoch: 1/30
Training - Loss: 1.2567, Accuracy: 0.5450
Validation - Loss: 1.0907, Accuracy: 0.6117
Time taken: 80.69 seconds
--------------------------------------------------


KeyboardInterrupt: 

In [16]:
# run_training_and_evaluation(CustomDataset, unfreeze_layers=config["unfreeze_layers"])

In [17]:
# custom_model = EmotionClassifierNN(input_size=512, num_classes=7) 
# run_training_and_evaluation(use_embeddings=True, custom_model=custom_model, num_epochs=150)

In [72]:
config = {
    'hdf5_path': "../../data/fer_2013_processed.h5",
    'fer2013_resnet18_embeddings_path': "../../data/fer2013_resnet18_embeddings.npy",
    'fer2013_resnet18_labels_path': "../../data/fer2013_resnet18_labels.npy",
    'model_name': 'google/vit-small-patch16-224',  # Assuming you're focusing on ViT small
    'batch_size': 32,
    'learning_rate': 0.0001,
    'num_epochs': 10,
    'device': torch.device("cuda:0" if torch.cuda.is_available() else "cpu"),
    'emotion_labels': ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral'],
    'L2_regularization': 1e-4,
    'unfreeze_layers': [
        'vit.encoder.layer.10', 
        'vit.encoder.layer.11', 
        'vit.layernorm', 
        'classifier.weight', 
        'classifier.bias'
    ],
}

# Recalculating class weights in case of changes
labels = np.load(config['fer2013_resnet18_labels_path'])
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(labels), y=labels)
config["class_weights"] = torch.tensor(class_weights, dtype=torch.float).to(config['device'])


In [19]:



def initialize_vit_model(model_name: str, num_classes: int, unfreeze_layers: list = None):
    """
    Initializes a Vision Transformer model for training, with the option for partial layer training.
    
    Args:
    - model_name (str): Identifier for the pre-trained model (e.g., 'google/vit-small-patch16-224').
    - num_classes (int): Number of classes for the classification head.
    - unfreeze_layers (list, optional): Specific layers to unfreeze for training. If None, all layers are trainable.
    
    Returns:
    - model (nn.Module): The initialized ViT model.
    - criterion (nn.Module): The loss function.
    - optimizer (Optimizer): The optimizer for training.
    """
    print("Initializing ViT Model...")
    
    # Load the pre-trained model without specifying num_labels to avoid the mismatch error
    model = ViTForImageClassification.from_pretrained(model_name)
    
    # Manually adjust the classifier to match the number of classes for your specific task
    model.classifier = nn.Linear(model.config.hidden_size, num_classes)

    # The rest of your function remains the same
    # Handling partial layer training
    if unfreeze_layers is not None:
        # Initially freeze all parameters
        for param in model.parameters():
            param.requires_grad = False
        
        # Unfreeze specified layers
        for name, param in model.named_parameters():
            if any(layer in name for layer in unfreeze_layers):
                print(f"Unfreezing layer: {name}")
                param.requires_grad = True
    else:
        # If no layers specified, all parameters are trainable
        for param in model.parameters():
            param.requires_grad = True

    model = model.to(config['device'])
    criterion = nn.CrossEntropyLoss(weight=config.get('class_weights'))
    optimizer = torch.optim.Adam(filter(lambda p: p.requires_grad, model.parameters()), 
                           lr=config['learning_rate'], 
                           weight_decay=config.get('L2_regularization'))

    return model, criterion, optimizer


In [80]:
def run_training_and_evaluation(model_type: str, use_embeddings: bool = False, num_epochs: int = config["num_epochs"]):
    """
    Adjusted function to run training and evaluation, supporting both ResNet and ViT models.
    
    Args:
    - model_type (str): The type of model to use ('resnet' or 'vit').
    - use_embeddings (bool): Indicates if training should be on precomputed embeddings.
    - num_epochs (int): Number of epochs to train for.
    """
    if use_embeddings:
        # Logic for handling embeddings remains the same.
        pass
    else:
        dataset = CustomDataset(config["hdf5_path"])
        train_loader, val_loader, test_loader = create_dataloaders(dataset)

    if model_type == 'vit':
        model, criterion, optimizer = initialize_vit_model(model_name='google/vit-base-patch16-224', num_classes=len(config['emotion_labels']))
    else:
        # Default to using ResNet or any other model initialization logic you have
        model, criterion, optimizer = initialize_resnet_model(model_name=config["model_name"], unfreeze_layers=config["unfreeze_layers"])

    metrics = train_and_validate(model, train_loader, val_loader, config, criterion, optimizer, num_epochs)
    plot_metrics(*metrics)
    
    all_labels, all_predictions = evaluate_model(model, test_loader, config["device"])
    calculate_and_plot_metrics(all_labels, all_predictions)


In [81]:
run_training_and_evaluation("vit")

creating data loaders
Initializing ViT Model...
Training and Validating...


OutOfMemoryError: CUDA out of memory. Tried to allocate 20.00 MiB. GPU 0 has a total capacty of 10.91 GiB of which 2.06 MiB is free. Process 28572 has 10.91 GiB memory in use. Of the allocated memory 9.65 GiB is allocated by PyTorch, and 426.12 MiB is reserved by PyTorch but unallocated. If reserved but unallocated memory is large try setting max_split_size_mb to avoid fragmentation.  See documentation for Memory Management and PYTORCH_CUDA_ALLOC_CONF

In [16]:
# Saving resnent embeddings
config = {
    'hdf5_path': "../../data/fer_2013_processed.h5",
    'fer2013_resnet18_embeddings_path': "../../data/FER/fer2013_resnet18_embeddings.npy",
    'fer2013_resnet18_labels_path': "../../data/FER/fer2013_resnet18_labels.npy",
    'model_name': "resnet18",
    'batch_size': 32,
    'learning_rate': 0.0001,
    'num_epochs': 30,
    'device': torch.device("cuda:0" if torch.cuda.is_available() else "cpu"),
    'emotion_labels': ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral'],
    'L2_regularization': 1e-4,
    'unfreeze_layers': ['layer4', 'fc'],
}

labels = np.load("../../data/fer2013_resnet18_labels.npy")
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(labels), y=labels)
config["class_weights"] = torch.tensor(class_weights, dtype=torch.float).to(config['device'])

In [17]:
# same as CustomDataset above
class CustomDataset(Dataset):
    def __init__(self, hdf5_filename, transform=None):
        
        self.file = h5py.File(hdf5_filename, 'r')
        self.images = self.file['images']
        self.labels = self.file['labels']
        
        # we can add data augmentations (overfitting here)
        self.transform = transform or transforms.Compose([
            transforms.ToPILImage(),
            transforms.Resize((224, 224)),
            transforms.Grayscale(num_output_channels=3),
            transforms.ToTensor(),
            transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
        ])
        self.emotion_labels = ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral']


    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        image = torch.tensor(self.images[idx], dtype=torch.float32)
        label = torch.tensor(self.labels[idx], dtype=torch.long)
        
        if self.transform:
            image = self.transform(image)
            
        return image, label

    def close(self):
        self.file.close()

In [18]:
def extract_and_save_resnet_embeddings(dataset: Dataset, model_name: str, output_path_embeddings: str, output_path_labels: str):
    """
    Extracts embeddings from a dataset using a specified model and saves them.
    """
    device = config['device']
    
    model = getattr(models, model_name)(pretrained=True)
    model.fc = nn.Identity()
    model = model.to(device).eval()
    loader = DataLoader(dataset, batch_size=config['batch_size'], shuffle=True, num_workers=4)
    
    embeddings, labels = [], []
    with torch.no_grad():
        for images, label in loader:
            images = images.to(device)
            output = model(images)
            embeddings.append(output.cpu().numpy())
            labels.append(label.cpu().numpy())
    
    embeddings = np.concatenate(embeddings, axis=0)
    labels = np.concatenate(labels, axis=0)
    
    output_dir = os.path.dirname(output_path_embeddings)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    # Save the embeddings
    np.save(output_path_embeddings, embeddings)
    np.save(output_path_labels, labels)
    
    return embeddings, labels

In [19]:
dataset = CustomDataset(config["hdf5_path"])
embeddings, labels = extract_and_save_resnet_embeddings(dataset, config["model_name"],
                                                        config["fer2013_resnet18_embeddings_path"],
                                                        config["fer2013_resnet18_labels_path"],
                                                       )



In [22]:
# Saving VIT embeddings
config = {
    'hdf5_path': "../../data/fer_2013_processed.h5",
    'fer2013_VIT_embeddings_path': "../../data/VIT/vit_embeddings.npy",
    'fer2013_VIT_labels_path': "../../data/VIT/vit_labels.npy",
    'model_name': 'google/vit-small-patch16-224',  # Assuming you're focusing on ViT small
    'batch_size': 32,
    'learning_rate': 0.0001,
    'num_epochs': 10,
    'device': torch.device("cuda:0" if torch.cuda.is_available() else "cpu"),
    'emotion_labels': ['Angry', 'Disgust', 'Fear', 'Happy', 'Sad', 'Surprise', 'Neutral'],
    'L2_regularization': 1e-4,
    'unfreeze_layers': [
        'vit.encoder.layer.10', 
        'vit.encoder.layer.11', 
        'vit.layernorm', 
        'classifier.weight', 
        'classifier.bias'
    ],
}

# Recalculating class weights in case of changes
labels = np.load(config['fer2013_VIT_labels_path'])
class_weights = compute_class_weight(class_weight='balanced', classes=np.unique(labels), y=labels)
config["class_weights"] = torch.tensor(class_weights, dtype=torch.float).to(config['device'])


In [13]:
def extract_and_save_vit_embeddings(dataset: Dataset, model_name: str, output_path_embeddings: str, output_path_labels: str):
    """
    Extracts embeddings from a dataset using a specified Vision Transformer model and saves them.
    """
    device = config['device']
    vit_config = ViTConfig.from_pretrained(model_name)
    model = ViTModel.from_pretrained(model_name, config=vit_config).to(device)
    model.eval()
    
    loader = DataLoader(dataset, batch_size=config['batch_size'], shuffle=False, num_workers=4)
    
    embeddings, labels = [], []
    with torch.no_grad():
        for images, label in loader:
            images = images.to(device)
            # Get the last hidden states from the base transformer model
            outputs = model(pixel_values=images)
            # ViTModel outputs object with last_hidden_state key
            last_hidden_state = outputs.last_hidden_state
            # Pool the outputs into a single vector per image
            pooled_output = last_hidden_state.mean(dim=1)
            embeddings.append(pooled_output.cpu().numpy())
            labels.append(label.cpu().numpy())
    
    embeddings = np.concatenate(embeddings, axis=0)
    labels = np.concatenate(labels, axis=0)
    
    # Save the embeddings and labels
    output_dir = os.path.dirname(output_path_embeddings)
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
        
    np.save(output_path_embeddings, embeddings)
    np.save(output_path_labels, labels)
    
    return embeddings, labels

In [14]:
# Usage
dataset = CustomDataset(config["hdf5_path"])
valid_model_name = "google/vit-base-patch16-224-in21k"
extract_and_save_vit_embeddings(dataset, valid_model_name, 
                                "../../data/VIT/vit_embeddings.npy",
                                "../../data/VIT/vit_labels.npy")

(array([[-0.01367931, -0.02108475, -0.26395455, ..., -0.15906289,
          0.10478855,  0.04918496],
        [-0.11447463,  0.20910661,  0.11022235, ..., -0.19749787,
         -0.08703191, -0.10839844],
        [ 0.08143617, -0.16727525, -0.16500036, ..., -0.02814235,
          0.02242826, -0.05163503],
        ...,
        [-0.12995712,  0.16077752, -0.07059553, ..., -0.2970761 ,
          0.0304957 ,  0.01058937],
        [-0.10362991, -0.09267259, -0.22218071, ..., -0.03878455,
         -0.00988647,  0.13555978],
        [ 0.01844411, -0.09121744, -0.17620955, ...,  0.06358803,
         -0.01801367,  0.10577129]], dtype=float32),
 array([0, 0, 2, ..., 0, 3, 2]))