In [1]:
# # This Python 3 environment comes with many helpful analytics libraries installed
# # It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# # For example, here's several helpful packages to load

# import numpy as np # linear algebra
# import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# # Input data files are available in the read-only "../input/" directory
# # For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

# import os
# for dirname, _, filenames in os.walk('/kaggle/input'):
#     for filename in filenames:
#         print(os.path.join(dirname, filename))

# # You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# # You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

In [2]:
# !mkdir /kaggle/working/AffectNet

In [3]:
# !ls /kaggle/input/affectnet

In [4]:
# !cp -r /kaggle/input/AffectNet/ /kaggle/working/AffectNet

In [5]:
# !tar -xvf /kaggle/working/AffectNet/AffectNet/train_set.tar -C /content/AffectNet/AffectNet/

In [6]:
# !tar -xvf /kaggle/working/AffectNet/AffectNet/val_set.tar -C /content/AffectNet/AffectNet/

In [7]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset
from torchvision import datasets
from torchvision.io import read_image
from torch.utils.data import DataLoader, random_split
from torchvision.transforms import ToTensor
from torchvision.transforms.functional import to_pil_image
from transformers import CLIPProcessor, CLIPModel
from tqdm.auto import tqdm
from PIL import Image
import numpy as np
import os
import optuna

ModuleNotFoundError: No module named 'optuna'

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

In [35]:
class ClipFERModel(nn.Module):
    def __init__(self, num_classes):
        super(ClipFERModel, self).__init__()
        # Load the pre-trained CLIP model
        
        self.clip_model = CLIPModel.from_pretrained('openai/clip-vit-base-patch32')
        self.processor = CLIPProcessor.from_pretrained('openai/clip-vit-base-patch32')
        
        # Replace the projection layer to adapt to FER task
        feature_dim = 512
        self.final_layer = nn.Linear(feature_dim, 256)
        self.relu = nn.ReLU()  # Add a ReLU activation layer
        self.classification_head = nn.Linear(256, num_classes)

    def forward(self, images):
        images = [to_pil_image(i) for i in images] 
        inputs = self.processor(images=images, return_tensors="pt").to(device)
        # Use only the image encoder to get image features
        image_features = self.clip_model.get_image_features(**inputs).to(device)
        # Pass features through final layer
        output = self.relu(self.final_layer(image_features))
        # Pass the features through the new classification head
        output = self.classification_head(output)
        return output

In [27]:
class CustomAffectNetDataset(Dataset):
    def __init__(self, annotations_dir, img_dir, transform=None, target_transform=None):
        # store the image files in sorted order
        self.img_dir = img_dir
        self.label_dir = annotations_dir
        self.img_files = sorted(os.listdir(img_dir))
        # self.img_label_files = sorted([file for file in os.listdir(annotations_dir) if 'exp' in file])
        self.transform = transform
        self.target_transform = target_transform

    def __len__(self):
        return len(self.img_files)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.img_files[idx])
        image = read_image(img_path)
        label_path = os.path.join(self.label_dir, f"{self.img_files[idx].split('.')[0]}_exp.npy")
        label = int(np.load(label_path))
        if self.transform:
            image = self.transform(image)
        if self.target_transform:
            label = self.target_transform(label)
        return image, label

In [28]:
TRAIN_IMG_PATH =  os.path.join(os.getcwd(), "..", "..", "AffectNet", "train_set", "train_set", "images")
TRAIN_LABELS_PATH =  os.path.join(os.getcwd(), "..", "..", "AffectNet", "train_set", "train_set", "annotations")
VAL_IMG_PATH =  os.path.join(os.getcwd(), "..", "..", "AffectNet", "val_set", "val_set", "images")
VAL_LABELS_PATH =  os.path.join(os.getcwd(), "..", "..", "AffectNet", "val_set", "val_set", "annotations")

In [29]:
def train_model(model, train_loader, criterion, optimizer, device):
    model.train()
    total_loss = 0
    for images, labels in tqdm(train_loader):
      # Ignore contempt since deepface ignores it
        label_to_ignore = 7
        relevance_mask = labels != label_to_ignore
        images = images[relevance_mask]
        labels = labels[relevance_mask]
        images, labels = images.to(device), labels.to(device)
        # print("Images device:", images.device)  # Check device of input images
        # print("Model device:", next(model.parameters()).device)  # Check device of model 
        # Forward pass
        outputs = model(images)
        loss = criterion(outputs, labels)

        # Backward and optimize
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()
    return total_loss / len(train_loader)

In [30]:
def validate_model(model, val_loader, device):
    model.eval()
    total = correct = 0
    with torch.no_grad():
        for images, labels in tqdm(val_loader):
          # Ignore contempt since deepface ignores it
            label_to_ignore = 7
            relevance_mask = labels != label_to_ignore
            images = images[relevance_mask]
            labels = labels[relevance_mask]
            images, labels = images.to(device), labels.to(device)
            outputs = model(images)
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    return 100 * correct / total

In [None]:
model = ClipFERModel(num_classes=7).to(device)

In [None]:
def objective(trial):
    # Define the hyperparameters
    batch_size = trial.suggest_categorical('batch_size', [16, 32, 64, 128])
    lr = trial.suggest_loguniform('lr', 1e-5, 1e-2)
    
    # Setup data, model, device
    model.to(device)
    criterion = nn.CrossEntropyLoss()
    optimizer = optim.Adam(model.parameters(), lr=lr)

    training_dataset = CustomAffectNetDataset(TRAIN_LABELS_PATH, TRAIN_IMG_PATH)
    total_size = len(training_dataset)
    train_size = int(0.2 * total_size)  # 50% of the dataset for training
    val_size = int(0.05 * total_size)
    # val_size = total_size - train_size  # 10% for validation
    training_data, validation_data, _ = random_split(training_dataset, [train_size, val_size, total_size - train_size - val_size])

    # Prepare your data loaders
    train_loader = DataLoader(training_data, batch_size=16, shuffle=True)
    val_loader = DataLoader(validation_data, batch_size=16, shuffle=False)

    # Train and validate the model
    train_loss = train_model(model, train_loader, criterion, optimizer, device)
    accuracy = validate_model(model, val_loader, device)

    return accuracy  # Return the accuracy as a score to maximize


In [None]:
# Create a study object and optimize the objective
study = optuna.create_study(direction='maximize')
study.optimize(objective, n_trials=20)

In [None]:
print("Best trial:")
trial = study.best_trial
print(f"  Value: {trial.value}")
print(f"  Params: ")
for key, value in trial.params.items():
    print(f"    {key}: {value}")

In [31]:
training_dataset = CustomAffectNetDataset(TRAIN_LABELS_PATH, TRAIN_IMG_PATH)
total_size = len(training_dataset)
train_size = int(0.5 * total_size)  # 80% of the dataset for training
val_size = int(0.1 * total_size)
# val_size = total_size - train_size  # 20% for validation
training_data, validation_data, _ = random_split(training_dataset, [train_size, val_size, total_size - train_size - val_size])

In [32]:
model = ClipFERModel(num_classes=7).to(device)
train_loader = DataLoader(training_data, batch_size=16, shuffle=True)
val_loader = DataLoader(validation_data, batch_size=16, shuffle=False)

criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)
num_epochs = 3
for epoch in range(num_epochs):
    loss = train_model(model, train_loader, criterion, optimizer, device)
    accuracy = validate_model(model, val_loader, device)
    print(f'Epoch {epoch+1}: Loss = {loss:.4f}, Validation Accuracy = {accuracy:.2f}%')

  0%|          | 0/8990 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [20]:
import gc; gc.collect()

18419