In [None]:
import os
import numpy as np
import tensorflow as tf
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report, confusion_matrix

import keras

from tensorflow.keras.layers import RandomTranslation, RandomZoom, RandomRotation

from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, GlobalAveragePooling2D, Dropout
from tensorflow.keras.applications.vgg16 import preprocess_input

from tensorflow.keras.callbacks import ReduceLROnPlateau, EarlyStopping, ModelCheckpoint

import matplotlib.pyplot as plt
import seaborn as sns

import sys
from pathlib import Path

project_root = Path().resolve().parent
if not project_root in [Path(p).resolve() for p in sys.path]:
    sys.path.append(str(project_root))

from src import PATHS
from src.visualization.visualize import draw_spider_graph_dark, conf_matrix_dark

## pour ne travailler que sur un √©chantillon :

In [None]:
sample = pd.read_parquet(os.path.join(project_root,'data', 'metadata', 'samples', 'df_documents_sample_40k_1.parquet'), engine='fastparquet')

converted_prefix = os.path.join(project_root, 'data', 'converted')
sample['filepath'] = sample['rvl_image_path'].apply(lambda p: os.path.join(converted_prefix, p.replace("raw/", "").replace(".tif", ".jpg")))
sample = sample.drop(columns=['rvl_image_path', 'document_id', 'filename', 'iit_image_path', 'iit_individual_xml_path', 'iit_collective_xml_path'])

## cr√©ation des sets train, test et validation

In [None]:
# 1. Encodage des labels : pas besoin ? 
label_encoder = LabelEncoder()
sample['label_encoded'] = label_encoder.fit_transform(sample['label'])

# 2. On part de sample pour cr√©er les diff√©rents sets
df_train = sample[sample['data_set'] == 'train']
df_val = sample[sample['data_set'] == 'val']
df_test = sample[sample['data_set'] == 'test']

# 3. Fonction pour charger et pr√©traiter une image
def process_image(file_path, label):
    image = tf.io.read_file(file_path)
    image = tf.image.decode_jpeg(image, channels=3) #parce que VGG16 attend 3 canaux
    image = tf.image.resize(image, [224, 224])  # taille attendue par VGG16
    image = image / 255.0  # Normalisation entre 0 et 1
    return image, label

# 4. Cr√©ation du dataset
def get_dataset(df_subset, shuffle=False):
    file_paths = df_subset['filepath'].values
    labels = df_subset['label_encoded'].values
    dataset = tf.data.Dataset.from_tensor_slices((file_paths, labels))
    dataset = dataset.map(lambda x, y: process_image(x, y), num_parallel_calls=tf.data.AUTOTUNE)
    
    if shuffle:
        dataset = dataset.shuffle(buffer_size=1000)
    
    dataset = dataset.batch(32).prefetch(tf.data.AUTOTUNE)
    return dataset

train_ds = get_dataset(df_train, shuffle=True)
val_ds = get_dataset(df_val)
test_ds = get_dataset(df_test)

In [None]:
len(train_ds) #attention c'est le nombre de batchs de 32 images, et il y en a 1 pas tout √† fait rempli. Ce nombre est donc attendu.

## On r√©cup√®re le mod√®le VGG16 pour le r√©entrainer sur nos images

In [None]:
# Mod√®le VGG16
base_model = keras.applications.VGG16(weights='imagenet', include_top=False)

# Freezer les couches du VGG16
base_model.trainable = False

# Red√©geler les couches apr√®s chargement
for layer in base_model.layers[-4:]:
    layer.trainable = True


# Cr√©ation du mod√®le avec l'API Fonctionnelle
inputs = Input(shape=(224, 224, 3))

# Application des augmentations                          
#x = RandomTranslation(height_factor=0.1, width_factor=0.1)(inputs) 
#x = RandomZoom(0.1)(x)  

# Construction du mod√®le
x = base_model(inputs)
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
x = Dropout(rate=0.2)(x)
x = Dense(512, activation='relu')(x)
x = Dropout(rate=0.2)(x)
outputs = Dense(16, activation='softmax')(x)

model = Model(inputs=inputs, outputs=outputs)

In [None]:
model.compile(optimizer=tf.keras.optimizers.Adam(learning_rate=1e-5),
              loss='sparse_categorical_crossentropy', 
              metrics=['accuracy'])

In [None]:
early_stopping = EarlyStopping(
                                patience=5, # Attendre 5 epochs avant application
                                min_delta=0.001, # si au bout de 5 epochs la fonction de perte ne varie pas de 1%, 
    # que ce soit √† la hausse ou √† la baisse, on arr√™te
                                verbose=1, # Afficher √† quel epoch on s'arr√™te
                                mode='min',
                                monitor='val_loss')

reduce_learning_rate = ReduceLROnPlateau(monitor="val_loss",
                                        patience=5,        # attendre un peu plus
                                        min_delta=0.001,   # plus sensible
                                        factor=0.5,        # r√©duire plus doucement
                                        cooldown=3,
                                        min_lr=1e-6,
                                        verbose=1
                                    )

checkpoint = ModelCheckpoint( #retient le meilleur mod√®le
    'best_model.keras',
    monitor='val_loss',
    save_best_only=True,
    verbose=1
)

In [None]:
history_model = model.fit(train_ds, 
                          epochs=50,
                          validation_data=val_ds, 
                          callbacks = [reduce_learning_rate,
                                       early_stopping, 
                                       checkpoint])
# je ne touche pas au set de validation pour l'instant, seulement, quand j'aurai des r√©sultats satisfaisants

In [None]:
# Save the best model
model.save(os.path.join(project_root, 'models', 'VGG16_best_50_epocs_sample_40_000.keras'))

In [None]:
plt.figure(figsize=(12,4))

plt.subplot(121)
plt.plot(history_model.history['loss'])
plt.plot(history_model.history['val_loss'])
plt.title('Model loss by epoch')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='right')

plt.subplot(122)
plt.plot(history_model.history['accuracy'])
plt.plot(history_model.history['val_accuracy'])
plt.title('Model accuracy by epoch')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='right')
plt.show()

## On evalue la qualit√© de notre mod√®le

In [None]:
loaded_model = keras.saving.load_model(os.path.join(project_root, 'models', 'VGG16_best_50_epocs_sample_40_000.keras'))

In [None]:
# √âtape 1 : Pr√©dire sur le test set
y_pred_probs = loaded_model.predict(test_ds)  # Probabilit√©s
y_pred = np.argmax(y_pred_probs, axis=1)  # Classes pr√©dites

# √âtape 2 : R√©cup√©rer les vrais labels depuis le test set, dans le m√™me ordre
y_true_check = []
for batch in test_ds:
    images, labels = batch
    y_true_check.extend(labels.numpy())

y_true = np.array(y_true_check)

# √âtape 3 : Rapport de classification
print("\n Rapport de classification :")
print(classification_report(y_true, y_pred))



In [None]:
# √âtape 4 : Matrice de confusion
print("Matrice de confusion :")
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title("Matrice de confusion")
plt.xlabel("Classe pr√©dite")
plt.ylabel("Classe r√©elle")
plt.show()

In [None]:
conf_matrix_dark(cm, "illustrations/vgg16_1_cm.png")


In [None]:
draw_spider_graph_dark(y_true, y_pred, save_path="illustrations/vgg16_1_spider.png")


# Maintenant, on essaie d'encore am√©liorer √ßa en faisant un progressive unfreezing

In [None]:
from tensorflow.keras.applications import VGG16
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, GlobalAveragePooling2D, Dense, Dropout

In [None]:
# Charger le mod√®le de base VGG16
base_model = VGG16(weights='imagenet', include_top=False)
base_model.trainable = False

# (Optionnel) d√©geler un peu au d√©but
for layer in base_model.layers[-4:]:
    layer.trainable = True

# D√©finir l‚Äôarchitecture compl√®te
inputs = Input(shape=(224, 224, 3))
x = base_model(inputs, training=False)
x = GlobalAveragePooling2D()(x)
x = Dense(1024, activation='relu')(x)
x = Dropout(0.2)(x)
x = Dense(512, activation='relu')(x)
x = Dropout(0.2)(x)
outputs = Dense(16, activation='softmax')(x)  # 16 classes
model = Model(inputs, outputs)


# Compiler
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=1e-4),
    loss='sparse_categorical_crossentropy',
    metrics=['accuracy']
)

In [None]:
#les callbacks
from tensorflow.keras.callbacks import Callback
import tensorflow.keras.backend as K

class SafeUnfreezeCallback(Callback):
    def __init__(self, base_model, model_path='best_model.keras',
                 unfreeze_step=5, max_unfreeze=30,
                 patience=5, min_delta=0.001):
        super().__init__()
        self.base_model = base_model
        self.model_path = model_path
        self.unfreeze_step = unfreeze_step
        self.max_unfreeze = max_unfreeze
        self.patience = patience
        self.min_delta = min_delta
        self.current_unfrozen = 4
        self.best_val_loss = float('inf')
        self.wait = 0
        self.trigger_reload = False

    def on_epoch_end(self, epoch, logs=None):
        val_loss = logs.get('val_loss')
        if val_loss is None:
            return

        if val_loss < self.best_val_loss - self.min_delta:
            self.best_val_loss = val_loss
            self.wait = 0
        else:
            self.wait += 1

        if self.wait >= self.patience and self.current_unfrozen < self.max_unfreeze:
            total_layers = len(self.base_model.layers)
            start = max(total_layers - self.current_unfrozen - self.unfreeze_step, 0)
            end = total_layers - self.current_unfrozen
            unfrozen = 0
            for layer in self.base_model.layers[start:end]:
                # Important : ne pas d√©geler les BatchNormalization
                if not isinstance(layer, tf.keras.layers.BatchNormalization):
                    layer.trainable = True
                    unfrozen += 1

            self.current_unfrozen += unfrozen
            print(f"\nüîì D√©gel de {unfrozen} couches suppl√©mentaires (total d√©gel√©es : {self.current_unfrozen})")

            # R√©duction du learning rate
            old_lr = float(K.get_value(self.model.optimizer.learning_rate))
            new_lr = max(old_lr * 0.5, 1e-5)
            try:
                K.set_value(self.model.optimizer.learning_rate, new_lr)
            except AttributeError:
                print("‚ö†Ô∏è Impossible de modifier le learning rate ‚Äî mauvais type. Recr√©ation de l'optimiseur avec le nouveau LR.")
                self.model.compile(
                    optimizer=tf.keras.optimizers.Adam(learning_rate=new_lr),
                    loss=self.model.loss,
                    metrics=self.model.metrics,
                )
            print(f"üìâ Nouveau learning rate : {old_lr:.2e} ‚Üí {new_lr:.2e}")

            # Stop pour recharger le meilleur mod√®le
            print("‚ö†Ô∏è Entra√Ænement interrompu ‚Üí rechargement du meilleur mod√®le")
            self.model.stop_training = True
            self.trigger_reload = True

    def reset(self):
        self.wait = 0
        self.best_val_loss = float('inf')
        self.trigger_reload = False

early_stopping = EarlyStopping(
                                patience=5, # Attendre 5 epochs avant application
                                min_delta=0.001, # si au bout de 5 epochs la fonction de perte ne varie pas de 1%, 
    # que ce soit √† la hausse ou √† la baisse, on arr√™te
                                verbose=1, # Afficher √† quel epoch on s'arr√™te
                                mode='min',
                                monitor='val_loss')

reduce_lr = ReduceLROnPlateau(monitor="val_loss",
                                        patience=4,        
                                        min_delta=0.001,   # plus sensible
                                        factor=0.5,        # r√©duire plus doucement
                                        cooldown=3,
                                        min_lr=1e-6,
                                        verbose=1
                                    )

checkpoint = ModelCheckpoint( #retient le meilleur mod√®le
    'best_model.keras',
    monitor='val_loss',
    save_best_only=True,
    verbose=1
)

unfreeze_cb = SafeUnfreezeCallback(base_model=base_model,
                                   unfreeze_step=5,
                                   max_unfreeze=100,
                                   patience=5,
                                   min_delta=0.001)

In [None]:
max_rounds = 10  # Nombre maximal de phases d'entra√Ænement
combined_history = {'loss': [], 'val_loss': [], 'accuracy': [], 'val_accuracy': []}


for round_idx in range(max_rounds):
    print(f"\nüîÅ Phase d'entra√Ænement {round_idx + 1}")
    
    # üîÑ R√©initialise proprement le callback
    unfreeze_cb.reset()

    # Recharger le meilleur mod√®le
    model.load_weights('best_model.keras')
    model.compile(optimizer=tf.keras.optimizers.Adam(
                      learning_rate=float(K.get_value(model.optimizer.learning_rate))),
                  loss='sparse_categorical_crossentropy',
                  metrics=['accuracy'])

    history = model.fit(train_ds,
                        validation_data=val_ds,
                        epochs=15,
                        callbacks=[unfreeze_cb, early_stopping, reduce_lr, checkpoint])

    # Combiner les historiques
    for key in combined_history:
        combined_history[key] += history.history.get(key, [])

    if not unfreeze_cb.trigger_reload:
        print("\n‚úÖ Entra√Ænement termin√© ‚Äî plus de couches √† d√©geler ou am√©lioration suffisante.")
        break

In [None]:
plt.figure(figsize=(12,4))

plt.subplot(121)
plt.plot(combined_history['loss'])
plt.plot(combined_history['val_loss'])
plt.title('Model loss by epoch')
plt.ylabel('loss')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='right')

plt.subplot(122)
plt.plot(combined_history['accuracy'])
plt.plot(combined_history['val_accuracy'])
plt.title('Model accuracy by epoch')
plt.ylabel('accuracy')
plt.xlabel('epoch')
plt.legend(['train', 'test'], loc='right')
plt.show()

In [None]:
# Save the failed model anyway
model.save(os.path.join(project_root, 'models', 'VGG16_best_failed_degel.keras'))

In [None]:
loaded_model = keras.saving.load_model(os.path.join(project_root, 'models', 'VGG16_best_progressive_unfreeze_40_000.keras'))

In [None]:
# √âtape 1 : Pr√©dire sur le test set
y_pred_probs = loaded_model.predict(test_ds)  # Probabilit√©s
y_pred = np.argmax(y_pred_probs, axis=1)  # Classes pr√©dites

# √âtape 2 : R√©cup√©rer les vrais labels depuis le test set, dans le m√™me ordre
y_true_check = []
for batch in test_ds:
    images, labels = batch
    y_true_check.extend(labels.numpy())

y_true = np.array(y_true_check)

# √âtape 3 : Rapport de classification
print("\n Rapport de classification :")
print(classification_report(y_true, y_pred))



In [None]:
# √âtape 4 : Matrice de confusion
print("Matrice de confusion :")
cm = confusion_matrix(y_true, y_pred)
plt.figure(figsize=(10, 8))
sns.heatmap(cm, annot=True, fmt='d', cmap='Blues')
plt.title("Matrice de confusion")
plt.xlabel("Classe pr√©dite")
plt.ylabel("Classe r√©elle")
plt.show()

In [None]:
conf_matrix_dark(cm, "illustrations/vgg16_2_cm.png")

In [None]:
draw_spider_graph_dark(y_true, y_pred, save_path="illustrations/vgg16_2_spider.png")
