## CLIP avec un classifieur basé sur la forêt aléatoire

In [None]:
import pandas as pd
import os
import torch
from PIL import Image
from sklearn.model_selection import train_test_split
from torch.utils.data import Dataset, DataLoader
from transformers import CLIPProcessor, CLIPModel
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score, f1_score
import numpy as np
from tqdm import tqdm
import torchvision.transforms as transforms
import csv
import numpy as np
from collections import Counter

In [None]:
# Charger les données
train_df = pd.read_csv("../preprocessing_for_CLIP/train.csv")
test_texts_df = pd.read_csv("../preprocessing_for_CLIP/test_texts.csv")
test_labels_df = pd.read_csv("../preprocessing_for_CLIP/test_labels.csv")
train_folder_path = "../dataset/TRAINING"
test_folder_path = "../dataset/test"

print("Répartition des labels dans le dataset d'entraînement :\n", train_df["label"].value_counts())

Répartition des labels dans le dataset d'entraînement :
 label
0    5000
1    5000
Name: count, dtype: int64


In [3]:
# Charger le modèle CLIP et le processor pour prétraiter les images et textes
print("Chargement du modèle CLIP...")
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
print("Modèle CLIP chargé !")

Chargement du modèle CLIP...
Modèle CLIP chargé !


In [None]:
def image_path_to_tensor(image_path, folder_path):
    # Charger l'image avec PIL
    full_image_path = os.path.join(folder_path, image_path)
    image = Image.open(full_image_path).convert("RGB")
    image_np = np.array(image, dtype=np.float32) / 255.0 

    # Vérifier si les valeurs sont bien dans la plage [0, 1]
    if np.any(image_np < 0) or np.any(image_np > 1):
        print(f"Attention: image avec valeurs hors de la plage [0, 1] dans {image_path}")

    # Utiliser le preprocesseur CLIP pour transformer l'image
    inputs = processor(images=image, return_tensors="pt", padding=True, do_rescale=False)

    return inputs['pixel_values'][0]

def get_clip_embeddings(texts, images):
    # Prétraiter les textes séparément
    text_inputs = processor(text=texts, return_tensors="pt", padding=True, truncation=True, max_length=77)
    
    # Normaliser les images dans la plage [0, 1]
    images_normalized = []
    for image in images:
        if isinstance(image, np.ndarray):
            image = np.clip(image / 255.0, 0.0, 1.0)
        else:
            image = np.clip(np.array(image) / 255.0, 0.0, 1.0)
        images_normalized.append(image)

    # Prétraiter les images séparément
    image_inputs = processor(images=images_normalized, return_tensors="pt", padding=True, do_rescale=False)  # Désactiver la mise à l'échelle

    with torch.no_grad():
        # Obtenir les embeddings des textes et des images
        text_embeds = model.get_text_features(**text_inputs)
        image_embeds = model.get_image_features(**image_inputs)

    return text_embeds, image_embeds

In [5]:
# Dataset pour charger les textes et les images
class MemeDataset(Dataset):
    def __init__(self, texts, images, labels):
        self.texts = texts
        self.images = images
        self.labels = labels

    def __len__(self):
        return len(self.texts)

    def __getitem__(self, idx):
        text = self.texts[idx]
        image = self.images[idx]
        label = self.labels[idx]
        return text, image, label

In [None]:
# Charger les images
print("Chargement des images de train")
train_images = [image_path_to_tensor(image_path, train_folder_path) for image_path in train_df["file_name"]]
print("Chargement des images de test")
test_images = [image_path_to_tensor(image_path, test_folder_path) for image_path in test_texts_df["file_name"]]


Chargement des images de train
Chargement des images de test


In [7]:
# Préparer les datasets
print("Préparation des datasets...")
train_texts = train_df["text"].tolist()
train_labels = train_df["label"].tolist()

test_texts = test_texts_df["text"].tolist()
test_labels = test_labels_df["label"].tolist()

train_dataset = MemeDataset(train_texts, train_images, train_labels)
test_dataset = MemeDataset(test_texts, test_images, test_labels)

Préparation des datasets...


In [None]:
train_loader = DataLoader(train_dataset, batch_size=16, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=16)
print("Préparation des datasets terminée !")

print("Entraînement du modèle de random forest...")

# Entraîner un modèle de random forest sur les embeddings CLIP
n_estimators = 50
max_depth = 10
min_samples_split = 5
min_samples_leaf = 4
max_features = "log2"
random_forest = RandomForestClassifier(n_estimators=n_estimators, max_depth=max_depth,
                                   min_samples_split=min_samples_split, min_samples_leaf=min_samples_leaf,
                                   max_features=max_features, random_state=42)

print("Entraînement du modèle de random forest terminé !")

Préparation des datasets terminée !
Entraînement du modèle de random forest...
Entraînement du modèle de random forest terminé !


In [None]:
# Extraire les embeddings et entraîner le modèle
def extract_embeddings(loader):
    embeddings = []
    labels = []

    for texts, images, label in tqdm(loader, desc="Extraction des embeddings", leave=False):
        text_embeds, image_embeds = get_clip_embeddings(texts, images)
        combined_embeds = torch.cat((text_embeds, image_embeds), dim=-1).numpy()
        embeddings.append(combined_embeds)
        labels.append(label.numpy())
    return np.vstack(embeddings), np.hstack(labels)

In [35]:
# Chemins des fichiers pour sauvegarder les embeddings
train_embeddings_file = "train_embeddings.npz"
test_embeddings_file = "test_embeddings.npz"

# Vérifier si les fichiers existent déjà
if os.path.exists(train_embeddings_file) and os.path.exists(test_embeddings_file):
    print("Chargement des embeddings sauvegardés...")
    train_data = np.load(train_embeddings_file)
    test_data = np.load(test_embeddings_file)
    X_train, y_train = train_data["X"], train_data["y"]
    X_test, y_test = test_data["X"], test_data["y"]
    print("Chargement des embeddings terminé !")
else:
    print("Extraction des embeddings d'entraînement...")
    X_train, y_train = extract_embeddings(train_loader)
    print("Extraction des embeddings de test...")
    X_test, y_test = extract_embeddings(test_loader)
    
    print("Extraction des embeddings terminée !")
    
    # Sauvegarde des embeddings
    np.savez_compressed(train_embeddings_file, X=X_train, y=y_train)
    np.savez_compressed(test_embeddings_file, X=X_test, y=y_test)
    print("Embeddings sauvegardés !")

Chargement des embeddings sauvegardés...
Chargement des embeddings terminé !


In [None]:
# Entraîner le modèle
random_forest.fit(X_train, y_train)

# Prédictions
y_pred_train = random_forest.predict(X_train)
y_pred_test = random_forest.predict(X_test)

# Calculer les métriques
train_acc = accuracy_score(y_train, y_pred_train)
test_acc = accuracy_score(y_test, y_pred_test)

train_f1 = f1_score(y_train, y_pred_train, average='binary')
test_f1 = f1_score(y_test, y_pred_test, average='binary')

print(f"Train Accuracy: {train_acc}")
print(f"Test Accuracy: {test_acc}")
print(f"Train F1 Score: {train_f1}")
print(f"Test F1 Score: {test_f1}")

Train Accuracy: 0.9678
Test Accuracy: 0.664
Train F1 Score: 0.9675337769711635
Test F1 Score: 0.6983842010771992


In [None]:
def format_dict(d):
    return " , ".join(f"{int(k)} : {int(v)}" for k, v in d.items())

# Distribution des prédictions
train_counts = Counter(y_pred_train)
test_counts = Counter(y_pred_test)

# Hyperparamètres et résultats
params = {
    "batch_size": 16,
    "n_estimators": n_estimators,
    "max_depth": max_depth,
    "min_samples_split": min_samples_split,
    "min_samples_leaf": min_samples_leaf,
    "max_features": max_features,
    "train_acc": round(train_acc, 4),
    "train_f1_score": round(train_f1, 4),
    "test_acc": round(test_acc, 4),
    "test_f1_score": round(test_f1, 4)
}

output_file = "hyperparameters_results_rf.csv"

if os.path.exists(output_file):
    # Lire le fichier existant
    with open(output_file, mode='r', newline='', encoding='utf-8-sig') as file:
        reader = csv.reader(file)
        rows = list(reader)

    header = rows[0]
    existing_value_columns = [col for col in header[1:] if col.startswith("valeurs_")]
    if existing_value_columns:
        last_index = max(int(col.split("_")[1]) for col in existing_value_columns)
        new_col_name = f"valeurs_{last_index + 1}"
    else:
        new_col_name = "valeurs_1"

    header.append(new_col_name)

    # Mettre à jour les lignes avec les nouvelles valeurs
    for i in range(1, len(rows)):
        key = rows[i][0]
        if key in params:
            rows[i].append(params[key])
        else:
            rows[i].append("")

    # Écrire les nouvelles données dans le fichier
    with open(output_file, mode='w', newline='', encoding='utf-8-sig') as file:
        writer = csv.writer(file)
        writer.writerows(rows)

else:
    # Fichier n'existe pas encore, le créer et écrire les données
    with open(output_file, mode='w', newline='', encoding='utf-8-sig') as file:
        writer = csv.writer(file)
        writer.writerow(["Hyperparamètres et Résultats", "valeurs_1"])

        for key, value in params.items():
            writer.writerow([key, value])

print(f"Données exportées avec succès dans {output_file}!")


Données exportées avec succès dans hyperparameters_results_rf.csv!
