In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import torchvision.datasets as datasets
import torchvision.transforms as transforms
import torchvision.models as models
import timm
import numpy as np
from tqdm import tqdm
from torch.utils.data import DataLoader, random_split

In [None]:
import torch

weights_file = "/kaggle/input/weights/keypoints_mae_resnet18_rafdb_epoch49.pth"
weights = torch.load(weights_file, map_location=torch.device('cpu'))  # or 'cuda' if using GPU

print(weights.keys())

In [None]:
encoder_weights = {k.replace("encoder.", ""): v for k, v in weights.items() if k.startswith("encoder.")}

model = timm.create_model(
    model_name="resnet18",
    pretrained=False,
)
model.fc = nn.Identity()
model.global_pool = nn.Identity()

model.load_state_dict(encoder_weights)

In [None]:
num_classes = 7

model.global_pool = nn.AdaptiveAvgPool2d(1)
model.fc = nn.Sequential(
    nn.Flatten(),  # Flatten to [batch_size, 512]
    nn.Linear(in_features=512, out_features=num_classes)  # Map to [batch_size, num_classes]
)

In [None]:
import subprocess
import os
import shutil

def install_package(package):
    subprocess.check_call(["pip", "install", package])
    
def download_file(file_id, output_name):
    install_package("gdown")
    import gdown
    gdown.download(id=file_id, output=output_name, quiet=False)

def unzip_file(zip_file):
    import zipfile
    with zipfile.ZipFile(zip_file, 'r') as zip_ref:
        zip_ref.extractall()
    os.remove(zip_file)

def remove_directory(directory):
    if os.path.exists(directory):
        shutil.rmtree(directory)
        

def download_fer2013():
    download_file('1YBuZaO7morIG43trYi0qtdelYGukBNCj', 'FER2013.zip')
    unzip_file('FER2013.zip')

def download_ferplus():
    download_file('1LShk6tZlsdBO-DciChK7y7nivUOvTAFk', 'FERPlus.zip')
    unzip_file('FERPlus.zip')
    remove_directory('fer_plus/train/contempt')
    remove_directory('fer_plus/val/contempt')
    remove_directory('fer_plus/test/contempt')


In [None]:
import torchvision.transforms as transforms
import torchvision.datasets as datasets
import torch

print("Loading data...")

target_size = 100
mean=[0.485, 0.456, 0.406]
std=[0.229, 0.224, 0.225]

batch_size = 512

data_transforms_FER = {
    'train': transforms.Compose([
        transforms.RandomResizedCrop(target_size, scale=(0.8, 1.2)),
        transforms.RandomApply([transforms.RandomAffine(0, translate=(0.2, 0.2))], p=0.5),
        transforms.RandomHorizontalFlip(),
        transforms.RandomApply([transforms.RandomRotation(10)], p=0.5),
        transforms.ToTensor(),
        transforms.Normalize(mean=mean, std=std)
    ]),
    'val': transforms.Compose([
        transforms.Resize((target_size, target_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean=mean, std=std)
    ]),
    'test': transforms.Compose([
        transforms.Resize((target_size, target_size)),
        transforms.ToTensor(),
        transforms.Normalize(mean=mean, std=std)
    ])
}



dataset_name = "rafdb"

if dataset_name == "rafdb":
    # Rafdb
    train_dataset = datasets.ImageFolder(root='/kaggle/input/raf-db-dataset/DATASET/train', transform=data_transforms_FER['train'])
    testset = datasets.ImageFolder(root='/kaggle/input/raf-db-dataset/DATASET/test', transform=data_transforms_FER['test'])
    seed = torch.Generator().manual_seed(64)
    # Define the split ratios
    train_size = int(0.8 * len(train_dataset))  # 80% for training
    val_size = len(train_dataset) - train_size  # Remaining 20% for validation
    # Perform the split
    trainset, valset = random_split(train_dataset, [train_size, val_size], generator=seed)
elif dataset_name == "fer2013":
    download_fer2013()
    # Fer2013
    trainset = datasets.ImageFolder(
        root='org_fer2013/train',
        transform=data_transforms_FER['train']
    )
    valset = datasets.ImageFolder(
        root='org_fer2013/val',
        transform=data_transforms_FER['val']
    )
    testset = datasets.ImageFolder(
        root='org_fer2013/test',
        transform=data_transforms_FER['test']
    )
elif dataset_name == "ferplus":
    download_ferplus()
    # Ferplus
    trainset = datasets.ImageFolder(
        root='fer_plus/train',
        transform=data_transforms_FER['train']
    )
    valset = datasets.ImageFolder(
        root='fer_plus/val',
        transform=data_transforms_FER['val']
    )
    testset = datasets.ImageFolder(
        root='fer_plus/test',
        transform=data_transforms_FER['test']
    )
    

train_loader = torch.utils.data.DataLoader(
    trainset,
    batch_size=batch_size,
    shuffle=True,
    num_workers=4
)

val_loader = torch.utils.data.DataLoader(
    valset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=4
)

test_loader = torch.utils.data.DataLoader(
    testset,
    batch_size=batch_size,
    shuffle=False,
    num_workers=4
)

print("Data loading completed")

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from tqdm import tqdm

# Define device
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# Move model to device
model = model.to(device)

# Define loss function and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.01)

# Define the learning rate scheduler
scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode='min', factor=0.1, patience=5, verbose=True)

# Training parameters
epochs = 120

# Metrics storage
train_losses = []
train_accuracies = []
val_losses = []
val_accuracies = []

best_val_loss = float('inf')

# Training and validation loop
for epoch in range(epochs):
    # Training phase
    model.train()
    epoch_train_loss = 0.0
    train_correct = 0
    train_total = 0

    train_loader_tqdm = tqdm(train_loader, desc="Training", leave=False)
    for images, labels in train_loader_tqdm:
        images, labels = images.to(device), labels.to(device)

        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward pass and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        # Update metrics
        epoch_train_loss += loss.item()
        _, predicted = torch.max(outputs, 1)
        train_total += labels.size(0)
        train_correct += (predicted == labels).sum().item()

    # Calculate training metrics for the epoch
    train_loss = epoch_train_loss / len(train_loader)
    train_accuracy = 100 * train_correct / train_total

    train_losses.append(train_loss)
    train_accuracies.append(train_accuracy)

    # Validation phase
    model.eval()
    epoch_val_loss = 0.0
    val_correct = 0
    val_total = 0

    val_loader_tqdm = tqdm(val_loader, desc="Validation", leave=False)
    with torch.no_grad():
        for images, labels in val_loader_tqdm:
            images, labels = images.to(device), labels.to(device)

            # Forward pass
            outputs = model(images)
            loss = criterion(outputs, labels)

            # Update metrics
            epoch_val_loss += loss.item()
            _, predicted = torch.max(outputs, 1)
            val_total += labels.size(0)
            val_correct += (predicted == labels).sum().item()

    # Calculate validation metrics for the epoch
    val_loss = epoch_val_loss / len(val_loader)
    val_accuracy = 100 * val_correct / val_total

    val_losses.append(val_loss)
    val_accuracies.append(val_accuracy)

    # Step the scheduler
    scheduler.step(val_loss)

    # Save the best model
    if val_loss < best_val_loss:
        torch.save(model.state_dict(), f'mae_resnet18_{dataset_name}_mask0.75_epoch{epoch}.pth')
        best_val_loss = val_loss

    # Print epoch metrics
    print(f"Epoch [{epoch + 1}/{epochs}]")
    print(f"  Training Loss: {train_loss:.4f}, Training Accuracy: {train_accuracy:.2f}%")
    print(f"  Validation Loss: {val_loss:.4f}, Validation Accuracy: {val_accuracy:.2f}%")

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.metrics import confusion_matrix, ConfusionMatrixDisplay
import numpy as np

# Initialize test loss and confusion matrix storage
test_loss = 0.0
all_labels = []
all_predictions = []

model.eval()
test_loader_tqdm = tqdm(test_loader, desc="Testing", leave=True)
with torch.no_grad():
    for images, labels in test_loader_tqdm:
        images, labels = images.to(device), labels.to(device)

        # Perform inference
        outputs = model(images)
        loss = criterion(outputs, labels)
        test_loss += loss.item()

        # Get predictions and true labels
        _, predicted = torch.max(outputs, 1)
        all_labels.extend(labels.cpu().numpy())
        all_predictions.extend(predicted.cpu().numpy())

# Calculate test accuracy
test_accuracy = 100 * np.sum(np.array(all_predictions) == np.array(all_labels)) / len(all_labels)
test_loss /= len(test_loader)

print(f"Test Loss: {test_loss:.4f}")
print(f"Test Accuracy: {test_accuracy:.2f}%")

In [None]:
# Confusion matrix
cm = confusion_matrix(all_labels, all_predictions)
# Normalize the confusion matrix to display percentages
cm_percentage = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis] * 100

if dataset_name == "rafdb":
    # Define RAF-DB specific class mappings
    index_to_class = {
        0: "Angry",
        1: "Disgust",
        2: "Fear",
        3: "Happy",
        4: "Neutral",
        5: "Sad",
        6: "Surprise"
    }
    classes = [index_to_class[i] for i in range(len(index_to_class))]  # Ensure consistency
else:
    # Use the default class names from the dataset
    classes = test_loader.dataset.classes

# Plot and save confusion matrix
plt.figure(figsize=(10, 8))
sns.heatmap(cm_percentage, annot=True, fmt=".2f", cmap="Blues", xticklabels=classes, yticklabels=classes)
plt.title(f"Confusion Matrix - {dataset_name} classification (%)")
plt.xlabel("Predicted Labels")
plt.ylabel("True Labels")
plt.tight_layout()
plt.savefig("confusion_matrix.png")
plt.show()

In [None]:
import matplotlib.pyplot as plt

# Function to plot and save training and validation metrics side by side
def plot_and_save_metrics(train_losses, val_losses, train_accuracies, val_accuracies, save_dir="metrics_plots"):
    import os
    os.makedirs(save_dir, exist_ok=True)  # Ensure the save directory exists

    
    # Create a figure for losses
    plt.figure(figsize=(12, 5))

    # Training Loss
    plt.subplot(1, 2, 1)
    plt.plot(train_losses, label='Training Loss')
    plt.title('Training Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    # Training Accuracy
    plt.subplot(1, 2, 2)
    plt.plot(train_accuracies, label='Training Accuracy')
    plt.title('Training Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()


    # Save and show the losses figure
    plt.tight_layout()
    plt.savefig(f"{save_dir}/training.png")
    plt.show()
    plt.close()


    
    # Create a figure for accuracies
    plt.figure(figsize=(12, 5))

    # Validation Loss
    plt.subplot(1, 2, 1)
    plt.plot(val_losses, label='Validation Loss', color='orange')
    plt.title('Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    # Validation Accuracy
    plt.subplot(1, 2, 2)
    plt.plot(val_accuracies, label='Validation Accuracy', color='orange')
    plt.title('Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy (%)')
    plt.legend()

    # Save and show the accuracies figure
    plt.tight_layout()
    plt.savefig(f"{save_dir}/validation.png")
    plt.show()
    plt.close()

    print(f"Plots saved in {save_dir}")

# Example usage
# Assuming train_losses, val_losses, train_accuracies, and val_accuracies are populated
plot_and_save_metrics(train_losses, val_losses, train_accuracies, val_accuracies)