# Clothes Similarity, inspired from TP1 CV finetuning

## Step 1 : finetuning Resnet50 for classification

### Imports

In [1]:
import os
import yaml
import subprocess
import numpy as np
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt
import platform
import shutil
import cv2
from IPython.display import clear_output
from sklearn.model_selection import train_test_split
from tqdm.notebook import tqdm
from sklearn.manifold import TSNE
from dataset_emma import myDataset
from tensorflow.keras.applications import ResNet50
from tensorflow.keras.models import Sequential, Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, Dense, GlobalAveragePooling2D, BatchNormalization
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.applications.resnet50 import preprocess_input
from keras.preprocessing import image
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import confusion_matrix, accuracy_score, precision_score, recall_score, f1_score




### Récupération des données

Paths

In [2]:
# Charger les configurations depuis le fichier YAML
with open("./config.yaml", 'r') as file:
    try:    
        config = yaml.safe_load(file)
    except yaml.YAMLError as exc:
        print(exc)

# Chemin vers le répertoire de données
data_dir = config["paths"]["data_path"]

# Chemin vers le répertoire des images
images_dir = os.path.join(data_dir, "images")
print('data_dir, images_dir : ', data_dir, images_dir)

# Chemin vers le répertoire de sortie
output_dir = os.path.join(data_dir, 'output')
print("output_dir : ", output_dir)

# Vérification et création du répertoire de sortie
if not os.path.exists(output_dir):
     os.makedirs(output_dir)

data_dir, images_dir :  C:\Users\emend\3A_new\3A_new\Computer_Vision\h-and-m-personalized-fashion-recommendations C:\Users\emend\3A_new\3A_new\Computer_Vision\h-and-m-personalized-fashion-recommendations\images
output_dir :  C:\Users\emend\3A_new\3A_new\Computer_Vision\h-and-m-personalized-fashion-recommendations\output


Création d'un dossier unique contenant les images format 224*224

In [None]:
# # Parcours des sous-répertoires dans le répertoire des images pour créer nouveau répertoire de sortie

# # Liste des sous-répertoires dans le répertoire des images
# dirs_ = os.listdir(images_dir)

# Nombre total d'images à traiter
# n = len(os.listdir(images_dir))

# for idx, dir in enumerate(dirs_):
#     print(f"Dossier {idx}/{n}")
    
#     # Parcours des images dans chaque sous-répertoire
#     for image in tqdm(os.listdir(os.path.join(images_dir, dir))):
#         # Chemin complet de l'image source
#         source_path = os.path.join(images_dir, dir, image)
#         # Chemin complet de l'image destination
#         destination_path = os.path.join(output_dir, image)

#         # Normaliser le chemin source
#         source_path = os.path.normpath(source_path)
#         # Normaliser le chemin destination
#         destination_path = os.path.normpath(destination_path)

#         # Déterminer le système d'exploitation
#         system = platform.system()
#         # Choix de la commande de copie en fonction du système d'exploitation
#         if system == "Windows":
#             copy_command = "copy"
#         else:
#             copy_command = "cp"
        
#         # Copie du fichier
#         shutil.copy(source_path, destination_path)

Chargement des données


In [3]:
dataset = myDataset(output_dir, get_preprocessed_image = False)
img_path = output_dir

Création d'un sous-dossier (facultatif)

In [4]:
img_path = os.path.join(data_dir, 'smaller_dataset')
# dataset.create_smaller_dataset(data_dir, 'smaller_dataset', 2000)

### Préparation X et y

X

In [5]:
# Données X
X_names = os.listdir(img_path)

# Liste pour stocker les images
images = []

# Parcourir chaque nom de fichier dans X_names et lire l'image correspondante
for filename in X_names:
    full_path = os.path.join(img_path, filename)
    # target_size = (224, 224)
    image = cv2.imread(full_path)
    # if image is not None:
    #     image = cv2.resize(image, target_size)
    # else:
    #     print("Impossible de lire l'image.")
    images.append(image)

# Convertir la liste d'images en un tableau numpy
X = np.array(images)

y

In [None]:
# Données y
tabular_dir = config["paths"]["tabular_path"]
label_column = 'product_group_name'

# Charger les labels de classes à partir du CSV
labels_df = pd.read_csv(tabular_dir)[['article_id', label_column]]
class_labels = labels_df[label_column].values 
labels_df['article_id'] = labels_df['article_id'].astype(str)
keys_complete = X_names
keys = [int(image_name[1:-4]) for image_name in keys_complete]

y = []
for idx in keys:
    idx = str(idx)
    if idx in labels_df['article_id'].values:
        y.append(labels_df[labels_df['article_id']==idx][label_column].values[0])
y = np.array(y)

# Instancier l'encodeur
label_encoder = LabelEncoder()

# Adapter l'encodeur aux étiquettes d'entraînement
y = label_encoder.fit_transform(y)

Split train/val/test

In [None]:
# Divisez d'abord les données en ensembles d'entraînement et de test (par exemple 80% train, 20% test)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Ensuite, divisez l'ensemble de test en ensembles de test et de validation (par exemple 50% test, 50% validation)
X_test, X_val, y_test, y_val = train_test_split(X_test, y_test, test_size=0.5, random_state=42)

### Modele

Modèle ResNet50 + finetuning dernières couches

In [None]:
# Charger le modèle ResNet50 pré-entraîné sans la couche de classification (top)
base_model = ResNet50(weights='imagenet', include_top=False)
base_model_output = base_model.layers[-2].output


# Créer vos propres couches de classification
model_top = Sequential([
    GlobalAveragePooling2D(input_shape=base_model.output_shape[1:]),  # Global Average Pooling
    Dense(128, activation='relu'),  # Couche cachée
    Dense(len(np.unique(y)), activation='softmax')  # Couche de sortie avec softmax pour la classification
])

# Combiner le modèle ResNet50 avec vos couches de classification
model = Model(inputs=base_model.input, outputs=model_top(base_model_output))

In [None]:
# Geler les couches du modèle ResNet50 pré-entraîné
for layer in base_model.layers:
    layer.trainable = False

# Compiler le modèle
optimizer = Adam(learning_rate=0.001)
model.compile(optimizer=optimizer,
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

# # Définir les taux d'apprentissage pour les couches ResNet50 et Dense, respectivement
# lr_base = 0.001  
# lr_dense = 0.01  

# # Créer un optimiseur avec différents taux d'apprentissage pour chaque couche
# optimizer = Adam(learning_rate=lr_base)
# for layer in model.layers:
#     if layer.name.startswith('resnet'):
#         optimizer.lr.assign(lr_base)
#     else:
#         optimizer.lr.assign(lr_dense)

# # Compiler le modèle
# model.compile(optimizer=optimizer,
#               loss='sparse_categorical_crossentropy',
#               metrics=['accuracy'])

Train

In [None]:
import tensorflow as tf
tf.config.run_functions_eagerly(True)

In [None]:
# Entraîner le modèle avec mini-batchs
batch_size = 32
epochs = 10
history = model.fit(X_train, y_train, 
                    batch_size=batch_size, 
                    epochs=epochs, 
                    validation_data=(X_val, y_val))

In [None]:
# model.save("resnet_finetuned.h5")

Evaluate

In [None]:
# Évaluation du modèle sur l'ensemble de test
test_loss, test_acc = model.evaluate(X_test, y_test)
print('Test accuracy:', test_acc)

Scores

In [None]:
y_pred = model.predict(X_test)
y_pred_classes = np.argmax(y_pred, axis=1)

conf_matrix = confusion_matrix(y_test, y_pred_classes)
accuracy = accuracy_score(y_test, y_pred_classes)
precision = precision_score(y_test, y_pred_classes, average='weighted')
recall = recall_score(y_test, y_pred_classes, average='weighted')
f1 = f1_score(y_test, y_pred_classes, average='weighted')

print('Matrice de confusion :\n', conf_matrix)
print('Accuracy :', accuracy)
print('Précision :', precision)
print('Rappel :', recall)
print('F1-score :', f1)