Script per eseguire la grid search sul modello clip vit large patch 

In [1]:
# Pre computa gli embedding di clip una volta sola e poi addestra il classificatore
import os
import json
import csv
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from PIL import Image
from torchvision import transforms
from transformers import CLIPProcessor, CLIPModel
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from tqdm import tqdm, trange
from itertools import product
import matplotlib.pyplot as plt
import re

import random
import numpy as np
SEED = 42
torch.manual_seed(SEED)
torch.cuda.manual_seed_all(SEED)
np.random.seed(SEED)
random.seed(SEED)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

# CONFIG
TRAIN_DATA_DIR = "../pre_processing/dataset/train/"
TRAIN_ANNOTATIONS_PATH = '../pre_processing/dataset/train.json'
VAL_DATA_DIR = "../pre_processing/dataset/val/"
VAL_ANNOTATIONS_PATH = '../pre_processing/dataset/val.json'
TEST_DATA_DIR = "../pre_processing/dataset/test/"
TEST_ANNOTATIONS_PATH = '../pre_processing/dataset/test.json'
BATCH_SIZE = 8
EPOCHS = 10
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
NOME_DEL_MODELLO_PRETRAINED = "openai/clip-vit-large-patch14"
#NOME_DEL_MODELLO_PRETRAINED = "openai/clip-vit-base-patch32"

# Iperparametri da esplorare
dropouts = [0.0] #[0.0, 0.1, 0.2, 0.3]
lrs = [0.01] #[0.01, 0.001, 0.0001]
weight_decays = [0] #[0, 1e-1, 1e-2, 1e-3, 1e-4, 1e-5]
schedulers = ['none'] #['none', 'StepLR', 'LinearLR', 'ExponentialLR', 'CosineAnnealingLR']
unlock_settings = [False] #[False, True]

# MOdello CLIP per la feature extraction
clip_model = CLIPModel.from_pretrained(NOME_DEL_MODELLO_PRETRAINED).to(DEVICE)
clip_processor = CLIPProcessor.from_pretrained(NOME_DEL_MODELLO_PRETRAINED)
clip_model.eval()

safe_nome_modello_pretrained = NOME_DEL_MODELLO_PRETRAINED.replace("/", "_")

# Funzione di utility per caricare le annotazioni
def load_annotations(path):
    with open(path, 'r', encoding='utf-8') as f:
        return json.load(f)

train_annotations = load_annotations(TRAIN_ANNOTATIONS_PATH)
val_annotations = load_annotations(VAL_ANNOTATIONS_PATH)
test_annotations = load_annotations(TEST_ANNOTATIONS_PATH)

all_labels = [a["label"] for a in train_annotations + val_annotations + test_annotations]
label_encoder = LabelEncoder()
label_encoder.fit(all_labels)

# FUnzione per trasformare le immagini
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    
    # transforms.RandomResizedCrop(224, scale=(0.8, 1.0)),
    # transforms.RandomHorizontalFlip(),
    # transforms.RandomRotation(degrees=15),
    # transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2, hue=0.1),
    
    transforms.ToTensor()
])

# Dataset per computare gli embeddings
class RawMultimodalDataset(Dataset):
    def __init__(self, annotations, img_folder, label_encoder):
        self.annotations = annotations
        self.img_folder = img_folder
        self.label_encoder = label_encoder

    def __len__(self):
        return len(self.annotations)

    def __getitem__(self, idx):
        item = self.annotations[idx]
        text = item["text"].lower().replace("\n", " ").strip()
        
        # Pulizia del testo
        text = text.lower()
        text = re.sub(r"\\n", " ", text).strip() 
        text = re.sub(r"\n", " ", text).strip() 
        text = re.sub(r"\\", " ", text).strip() 
        text = re.sub(r"  ", " ", text).strip()
        
        image_path = os.path.join(self.img_folder, item["label"], item["image"])
        image = Image.open(image_path).convert("RGB")
        image = transform(image)
        label = self.label_encoder.transform([item["label"]])[0]
        return text, image, torch.tensor(label, dtype=torch.float32)

# Precompute Embeddings
def compute_embeddings(dataset):
    text_embeds, image_embeds, labels = [], [], []
    for text, image, label in tqdm(dataset, desc="Computing Embeddings"):
        inputs = clip_processor(text=[text], images=image.unsqueeze(0), return_tensors="pt", padding=True, truncation=True).to(DEVICE)
        with torch.no_grad():
            clip_out = clip_model(**inputs)
        features = torch.cat([clip_out.text_embeds, clip_out.image_embeds], dim=1)
        text_embeds.append(features.squeeze(0).cpu())
        labels.append(label)
    return torch.stack(text_embeds), torch.tensor(labels)

# Caricamento dataset
train_raw = RawMultimodalDataset(train_annotations, TRAIN_DATA_DIR, label_encoder)
val_raw = RawMultimodalDataset(val_annotations, VAL_DATA_DIR, label_encoder)
test_raw = RawMultimodalDataset(test_annotations, TEST_DATA_DIR, label_encoder)

train_features, train_labels = compute_embeddings(train_raw)
val_features, val_labels = compute_embeddings(val_raw)
test_features, test_labels = compute_embeddings(test_raw)

# Dataset wrapper
class PrecomputedDataset(Dataset):
    def __init__(self, features, labels):
        self.features = features
        self.labels = labels

    def __len__(self):
        return len(self.labels)

    def __getitem__(self, idx):
        return self.features[idx], self.labels[idx]

train_dataset = PrecomputedDataset(train_features, train_labels)
val_dataset = PrecomputedDataset(val_features, val_labels)
test_dataset = PrecomputedDataset(test_features, test_labels)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=BATCH_SIZE)
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)

# Modello di classificazione
class MultimodalClassifier(nn.Module):
    def __init__(self, input_dim, hidden_dim=512, dropout=0.0):
        super().__init__()
        self.classifier = nn.Sequential(
            nn.Linear(input_dim, hidden_dim),
            nn.ReLU(),
            nn.Dropout(dropout),
            nn.Linear(hidden_dim, 1),
            nn.Sigmoid()
        )

    def forward(self, x):
        return self.classifier(x)

# File in cui trasferire i risultati
results_path = f"{safe_nome_modello_pretrained}_best_and_aug.csv"
with open(results_path, 'w', newline='') as f:
    writer = csv.writer(f)
    writer.writerow(['dropout', 'lr', 'weight_decay', 'scheduler', 'unlocked',
                     'val_accuracy', 'val_precision', 'val_recall', 'val_f1',
                     'test_accuracy', 'test_precision', 'test_recall', 'test_f1'])


# Training Loop per Grid Search 
combinations = list(product(dropouts, lrs, weight_decays, schedulers, unlock_settings))

for dropout, lr, wd, sched_name, unlock in tqdm(combinations, desc="Grid Search", leave=True):
    
    model_name = f"./saves_model2.2/{safe_nome_modello_pretrained}_drop{dropout}_lrs{lr}_weights_{wd}_sched_{sched_name}_unlock{unlock}_augmented"
    
    #model_name = f"./saves_model2.2/openai_clip_vit_large_patch14_drop{dropout}_lrs{lr}_weights_{wd}_sched_{sched_name}_unlock{unlock}"
    safe_model_name = model_name #.replace("/", "_")

    #classifier = MultimodalClassifier(input_dim=768+768, dropout=dropout).to(DEVICE)
    clip_hidden_dim = clip_model.config.projection_dim
    classifier = MultimodalClassifier(input_dim=clip_hidden_dim * 2, dropout=dropout).to(DEVICE)

    
    criterion = nn.BCELoss()
    optimizer = torch.optim.Adam(classifier.parameters(), lr=lr, weight_decay=wd)

    if sched_name == 'StepLR':
        scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.5)
    elif sched_name == 'LinearLR':
        scheduler = torch.optim.lr_scheduler.LinearLR(optimizer, start_factor=0.9, total_iters=EPOCHS)
    elif sched_name == 'ExponentialLR':
        scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)
    elif sched_name == 'CosineAnnealingLR':
        scheduler = torch.optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=EPOCHS)
    else:
        scheduler = None

    train_losses = []
    val_losses = []
    
    if unlock:
        clip_model = CLIPModel.from_pretrained(NOME_DEL_MODELLO_PRETRAINED).to(DEVICE)
        clip_model.train()
    
        for param in clip_model.parameters():
            param.requires_grad = False  # Freeza tutto
    
        for param in clip_model.vision_model.encoder.layers[-3:].parameters():
            param.requires_grad = True
        for param in clip_model.text_model.encoder.layers[-3:].parameters():
            param.requires_grad = True
        for param in clip_model.vision_model.post_layernorm.parameters():
            param.requires_grad = True
        for param in clip_model.text_model.final_layer_norm.parameters():
            param.requires_grad = True


    #for epoch in trange(EPOCHS, desc=f"{safe_model_name}", leave=False):
    for epoch in range(EPOCHS):
        classifier.train()
        epoch_loss = 0
        for features, labels in train_loader:
            features = features.to(DEVICE)
            labels = labels.to(DEVICE)
            logits = classifier(features).squeeze()
            loss = criterion(logits, labels)
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            epoch_loss += loss.item()
        train_losses.append(epoch_loss / len(train_loader))

        classifier.eval()
        val_loss = 0
        with torch.no_grad():
            for features, labels in val_loader:
                features = features.to(DEVICE)
                labels = labels.to(DEVICE)
                logits = classifier(features).squeeze()
                loss = criterion(logits, labels)
                val_loss += loss.item()
        val_losses.append(val_loss / len(val_loader))

        if scheduler:
            scheduler.step()

    def evaluate(loader):
        classifier.eval()
        all_preds, all_labels = [], []
        with torch.no_grad():
            for features, labels in loader:
                features = features.to(DEVICE)
                logits = classifier(features).squeeze()
                preds = (logits > 0.5).float()
                all_preds.extend(preds.cpu().tolist())
                all_labels.extend(labels.cpu().tolist())
        acc = accuracy_score(all_labels, all_preds)
        prec = precision_score(all_labels, all_preds, average='macro', zero_division=0)
        rec = recall_score(all_labels, all_preds, average='macro', zero_division=0)
        f1 = f1_score(all_labels, all_preds, average='macro', zero_division=0)
        return acc, prec, rec, f1

    val_metrics = evaluate(val_loader)
    test_metrics = evaluate(test_loader)

    with open(results_path, 'a', newline='') as f:
        writer = csv.writer(f)
        writer.writerow([dropout, lr, wd, sched_name, unlock, *val_metrics, *test_metrics])

    # Salva loss plot
    plt.figure()
    plt.plot(train_losses, label="Train Loss")
    plt.plot(val_losses, label="Val Loss")
    plt.xlabel("Epoch")
    plt.ylabel("Loss")
    plt.title(f"Loss Curve - {model_name}")
    plt.legend()
    plt.tight_layout()
    #plt.savefig(f"{safe_model_name}_loss_curve.png")
    plt.close()

    # Salva model
    #torch.save(classifier.state_dict(), f"{safe_model_name}.pt")
    #del classifier
    #torch.cuda.empty_cache()


Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
Computing Embeddings:   0%|          | 0/1200 [00:00<?, ?it/s]It looks like you are trying to rescale already rescaled images. If the input images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again.
Computing Embeddings: 100%|██████████| 1200/1200 [01:11<00:00, 16.73it/s]
Computing Embeddings: 100%|██████████| 150/150 [00:11<00:00, 13.44it/s]
Computing Embeddings: 100%|██████████| 300/300 [00:23<00:00, 12.88it/s]
Grid Search: 100%|██████████| 1/1 [00:03<00:00,  3.28s/it]
