In [1]:
import config
import data
import os

from transformers import TFAutoModel
import tensorflow as tf

from keras.models import Model
from keras.utils.vis_utils import plot_model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.layers import (
    Concatenate,
    Conv1D,
    Dense,
    Dropout,
    # Embedding,
    Flatten,
    Input,
    MaxPooling1D,
)

import json
import numpy as np
import pprint



In [2]:

# Import 
import os
import config
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

import tensorflow as tf
from transformers import TFCamembertModel
from sklearn.metrics import (
    accuracy_score, 
    confusion_matrix, 
    ConfusionMatrixDisplay,
    f1_score, 
    precision_score, 
    recall_score, 
    roc_curve,
    roc_auc_score 
)


def calculate_estimated_year_tensor(intervals, probabilities):
    def get_bounds(interval: str):
        start, end = interval.strip("[]()").split(", ")
        return int(start), int(end)
    
    values = []
    for i, interval in enumerate(intervals):
        start, end = get_bounds(interval)
        if i == 0:
            values.append(start)
        elif i == len(intervals) - 1:
            values.append(end)
        else:
            values.append((start + end) / 2)
    
    values_tensor = tf.constant(values, dtype=tf.float32)
    estimated_date = tf.reduce_sum(values_tensor * probabilities, axis=1)
    estimated_year = tf.round(estimated_date)
    
    return estimated_year


def custom_loss(y_true, y_pred):
    # Obtenez les années estimées en utilisant les prédictions fournies par le modèle
    # intervals = ['[1825, 1850)', '[1850, 1875)', '[1875, 1900)', '[1900, 1925)', '[1925, 1950)', '[1950, 1975)', '[1975, 2000)', '[2000, 2024)']
    estimated_years = calculate_estimated_year_tensor(list(config.DATE_MAP.values()), y_pred)
    # estimated_years = calculate_estimated_year_tensor(intervals, y_pred)
    # Convertir y_true en float32 sans changer la shape
    y_true_float = tf.cast(y_true, tf.float32)
    # Reshape y_true_float : passer de [16 1] à [16]
    y_true_float = tf.reshape(y_true_float, [-1])
    # Calculez les erreurs carrées entre les années estimées et les valeurs réelles
    squared_errors = tf.square(estimated_years - y_true_float)
    # Sommez les carrés des erreurs pour obtenir la somme totale des erreurs
    sum_of_squared_errors = tf.reduce_sum(squared_errors)
    # Calculer le nombre total d'échantillons
    num_samples = tf.cast(tf.shape(y_true_float)[0], tf.float32)
    # Calculer la moyenne des erreurs carrées
    mean_squared_error = sum_of_squared_errors / num_samples
    
    return mean_squared_error


def custom_objects_dict():
    cutom_objects = {
        "TFCamembertModel": TFCamembertModel, 
        "custom_loss": custom_loss, 
        "custom_metric": custom_metric,
    }
    return cutom_objects


def custom_metric(y_true, y_pred):
    # intervals = ['[1825, 1850)', '[1850, 1875)', '[1875, 1900)', '[1900, 1925)', '[1925, 1950)', '[1950, 1975)', '[1975, 2000)', '[2000, 2024)']
    estimated_years = calculate_estimated_year_tensor(list(config.DATE_MAP.values()), y_pred)
    # estimated_years = calculate_estimated_year_tensor(intervals, y_pred)
    # Convertir y_true en float32 sans changer la shape
    y_true_float = tf.cast(y_true, tf.float32)
    # Aplatir y_true pour correspondre à estimated_years
    y_true_flat = tf.reshape(y_true_float, [-1])
    # Calcul de la précision
    correct_predictions = tf.abs(estimated_years - y_true_flat) <= 25
    accuracy = tf.reduce_mean(tf.cast(correct_predictions, tf.float32))
    return accuracy


def evaluate_model(df: pd.DataFrame, confusion_matrix_output: str, roc_curve_output: str):
    plt.style.use("seaborn-v0_8-whitegrid")
    true_sexe = np.array(df["true sexe"].tolist())
    pred_sexe = np.array(df["pred sexe"].tolist())
    true_date = np.array(df["true date"].tolist())

    # Sexe metrics
    accuracy_sexe = accuracy_score(true_sexe, pred_sexe)
    precision_sexe = precision_score(true_sexe, pred_sexe)
    recall_sexe = recall_score(true_sexe, pred_sexe)
    f1_sexe = f1_score(true_sexe, pred_sexe)
    auc_sexe = roc_auc_score(true_sexe, pred_sexe)

    print(f"\nSexe - Accuracy: {accuracy_sexe} \nPrecision: {precision_sexe} \nRecall: {recall_sexe} \nF1 Score: {f1_sexe} \nAUC: {auc_sexe}\n")

    # Enregistrement des métriques dans un .json
    metrics = {
        "accuracy_sexe": accuracy_sexe,
        "precision_sexe": precision_sexe,
        "recall_sexe": recall_sexe,
        "f1_sexe": f1_sexe,
        "auc_sexe": auc_sexe
    }

    with open(config.MM_CNN_RESULT_PATH + f"{config.MM_CNN}_predictions_metrics.json", "w") as json_file:
        json.dump(metrics, json_file, indent=4)

    # Confusion matrix

    plt.style.use("seaborn-v0_8-white")
    # Convert predictions to binary
    pred_binary = (pred_sexe > 0.5).astype(int)
    # Calculate confusion matrix
    cm = confusion_matrix(true_sexe, pred_binary)
    # Normalize the confusion matrix to display rates
    cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
    # Plot confusion matrix with updated display labels
    disp = ConfusionMatrixDisplay(confusion_matrix=cm_normalized, display_labels=["Femme", "Homme"])
    disp.plot(cmap=plt.cm.Blues, values_format=".2f")  # Set the colormap to Blues and format to 2 decimal places
    plt.title(f"Matrice de confusion - Sexe")
    plt.savefig(confusion_matrix_output)
    plt.close()

    # Plot ROC curve
    fpr, tpr, _ = roc_curve(true_sexe, pred_sexe)
    plt.figure()
    plt.plot(fpr, tpr, color="blue", lw=2, label="ROC curve (AUC = %0.2f)" % auc_sexe)
    plt.plot([0, 1], [0, 1], color="grey", lw=2, linestyle="--")
    plt.xlim([0.0, 1.0])
    plt.ylim([0.0, 1.05])
    plt.xlabel("False Positive Rate")
    plt.ylabel("True Positive Rate")
    plt.title("Receiver Operating Characteristic - Sexe")
    plt.legend(loc="lower right")
    plt.savefig(roc_curve_output)
    plt.close()

    return accuracy_sexe, precision_sexe, recall_sexe, f1_sexe, auc_sexe


def prediction(model, input_data, sexe_label, date_label=None):
    test_predictions = model.predict(input_data)

    if len(test_predictions) == 2:
        # Unpack predictions
        pred_sexe = np.squeeze((test_predictions[0] > 0.5).astype("int32"))
        pred_date = test_predictions[1]
    else:
        pred_sexe = np.squeeze((test_predictions > 0.5).astype("int32"))
        
    true_sexe = np.array(sexe_label).astype("int32")
    true_date = np.array(date_label).astype("int32") if date_label is not None else None
    
    df = pd.DataFrame({
        "true sexe": true_sexe,
        "pred sexe": pred_sexe,
        "true date": true_date,
        # "pred date": pred_date,
    })
    return df


def save_accuracy_by_interval_and_gender(df: pd.DataFrame, output_file: str):
    """
    Prépare les données pour calculer l'accuracy pour chaque sexe et chaque intervalle,
    puis trace un histogramme à barres pour l'accuracy par intervalle et sexe.

    Arguments :
    df : DataFrame - Le DataFrame contenant les données à analyser.
    """
    plt.style.use("seaborn-v0_8-whitegrid")
    # Préparer les données pour l'accuracy
    intervals = df["interval"].unique()
    data_for_plot = []

    for interval in intervals:
        df_interval = df[df["interval"] == interval]
        for sex in [0, 1]:  # 0: Femme, 1: Homme
            df_sex = df_interval[df_interval["true sexe"] == sex]
            cm = confusion_matrix(df_sex["true sexe"], df_sex["pred sexe"])
            total = cm.sum()
            correct_predictions = cm.trace()  # Sum of True Positives and True Negatives
            accuracy = correct_predictions / total
            # gender = "Femme" if sex == 0 else "Homme"
            gender = "Femme" if sex == 0 else "Homme"
            data_for_plot.append((interval, gender, accuracy))
    
    df_accuracy = pd.DataFrame(data_for_plot, columns=["Interval", "Gender", "Accuracy"])

    # Tracer l'histogramme à barres pour l'accuracy par intervalle et sexe
    plt.figure(figsize=(14, 8))
    ax = sns.barplot(x="Interval", y="Accuracy", hue="Gender", data=df_accuracy, palette=["#FF7F0E", "#1F77B4"])
    plt.xlabel("Intervalles")
    plt.ylabel("Taux d\'accuracy")
    plt.title("Taux d\'accuracy par intervalle et sexe")
    plt.xticks(rotation=45)
    # plt.legend(title="Sexe")
    plt.legend(title="Sexe", loc="upper right", frameon=True)
    # plt.legend(title="Sexe", loc='upper right', fontsize=12, title_fontsize=14, frameon=True, fancybox=False, framealpha=1, shadow=False, borderpad=1)
    plt.ylim(0, 1)  # Limiter l'axe Y entre 0 et 1

    # Ajouter les valeurs d'accuracy au-dessus de chaque barre
    for p in ax.patches:
        if p.get_height() != 0.00:
            ax.annotate(f"{p.get_height():.2f}", 
                    (p.get_x() + p.get_width() / 2., p.get_height()), 
                    ha="center", va="bottom", fontsize=8, color="black", xytext=(0, 5), textcoords="offset points")

    plt.savefig(output_file)
    plt.close()


def save_hist_confusion_matrix(df: pd.DataFrame, output_file: str):
    """
    Prépare les données et trace l'histogramme de la matrice de confusion normalisée par intervalle et sexe.

    Arguments :
    df : DataFrame - Le DataFrame contenant les données à analyser.

    Renvoie :
    None
    """
    def prepare_confusion_data(df: pd.DataFrame):
        """
        Prépare les données pour la matrice de confusion normalisée.

        Arguments :
        df : DataFrame - Le DataFrame contenant les données à analyser.

        Renvoie :
        DataFrame - Un DataFrame contenant les données de la matrice de confusion normalisée.
        """
        # Obtenir les intervalles uniques
        intervals = df["interval"].unique()
        data_for_plot = []

        # Parcourir chaque intervalle
        for interval in intervals:
            # Filtrer le DataFrame pour l'intervalle actuel
            df_interval = df[df["interval"] == interval]
            # Parcourir les deux sexes (0: Femme, 1: Homme)
            for sex in [0, 1]:
                # Filtrer le DataFrame pour le sexe actuel
                df_sex = df_interval[df_interval["true sexe"] == sex]
                # Calculer la matrice de confusion
                cm = confusion_matrix(df_sex["true sexe"], df_sex["pred sexe"])
                # Calculer le total des prédictions
                total = cm.sum()
                print("total")
                # Normaliser la matrice de confusion
                cm_normalized = cm / total
                # Déterminer le genre (Femme ou Homme)
                gender = "Femme" if sex == 0 else "Homme"
                # Parcourir la matrice de confusion normalisée
                for i, row in enumerate(cm_normalized):
                    for j, val in enumerate(row):
                        # Déterminer la catégorie (Vrai ou Faux)
                        category = "True" if i == j else "False"
                        data_for_plot.append((interval, f"{category} {gender}", val))
        
        # Créer un DataFrame à partir des données
        return pd.DataFrame(data_for_plot, columns=["Interval", "Category", "Value"])
    plt.style.use("seaborn-v0_8-whitegrid")
    # Préparer les données
    df_plot = prepare_confusion_data(df)

    # Utiliser la palette tab10 de seaborn
    colors = sns.color_palette("tab10")

    # Créer une palette avec les couleurs spécifiées pour chaque catégorie
    custom_palette = {"True Femme": colors[1], "True Homme": colors[0]}
    palette = {**custom_palette, **{category: colors[i+2] for i, category in enumerate(df_plot["Category"].unique()) if category not in custom_palette}}

    # Tracer l'histogramme à barres avec la palette personnalisée
    plt.figure(figsize=(14, 8))
    ax = sns.barplot(x="Interval", y="Value", hue="Category", data=df_plot, palette=palette)
    plt.xlabel("Intervalles")
    plt.ylabel("Proportion")
    plt.title("Histogramme des matrices de confusions par intervalle et sexe")
    plt.xticks(rotation=45)
    # plt.legend(title="Prédictions")
    plt.legend(title="Prédictions", loc="upper right", frameon=True)
    plt.ylim(0, 1)  # Limiter l'axe Y entre 0 et 1

    # Ajouter les valeurs normalisées au-dessus de chaque barre
    for p in ax.patches:
        if p.get_height() != 0.00:
            ax.annotate(f'{p.get_height():.2f}', 
                        (p.get_x() + p.get_width() / 2., p.get_height()), 
                        ha="center", va="bottom", fontsize=8, color="black", xytext=(3, 5), textcoords="offset points")

    plt.savefig(output_file)
    plt.close()


def save_training_history(history_data: dict, save_dir: str="plots", base_filename: str="plot", single_figure: bool=False):
    """
    Save the training history for a model and save the figures.

    Parameters:
    history (History): The training history returned by model.fit()
    save_dir (str): Directory to save the plots
    base_filename (str): Base filename for the saved plots
    single_figure (bool): Whether to save all plots in a single figure
    """
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)

    # Function to annotate values on the plot
    def annotate_values(ax, x, y):
        for i, txt in enumerate(y):
            ax.annotate(round(txt, 2), (x[i], y[i]), textcoords="offset points", xytext=(0, 5), ha="center")

    # Set the style
    plt.style.use("seaborn-v0_8-whitegrid")

    if single_figure:
        # Determine the number of unique keys (excluding validation keys)
        unique_keys = set(key.replace("val_", "") for key in history_data.keys())
        num_plots = len(unique_keys)
        fig, axs = plt.subplots(num_plots, 1, figsize=(15, num_plots * 5))
        
        if num_plots == 1:
            axs = [axs]

        for i, key in enumerate(unique_keys):
            train_key = key
            val_key = "val_" + key

            axs[i].plot(history_data[train_key], label=f"Training {train_key.replace('_', ' ').title()}", marker="o")
            if val_key in history_data:
                axs[i].plot(history_data[val_key], label=f"{val_key.replace('_', ' ').title()}", marker="o")
            
            axs[i].set_xlabel("Epochs")
            axs[i].set_ylabel(train_key.split("_")[-1].title())
            axs[i].set_title(train_key.replace("_", " ").title())
            axs[i].legend()
            axs[i].grid(True)

            annotate_values(axs[i], range(len(history_data[train_key])), history_data[train_key])
            if val_key in history_data:
                annotate_values(axs[i], range(len(history_data[val_key])), history_data[val_key])
        
        plt.tight_layout()
        plt.savefig(os.path.join(save_dir, f"{base_filename}_combined.png"))
        plt.close()
    else:
        for key in set(key.replace("val_", "") for key in history_data.keys()):
            train_key = key
            val_key = "val_" + key

            plt.figure(figsize=(8, 6))
            plt.plot(history_data[train_key], label=f"Train {train_key.replace('_', ' ').title()}", marker="o")
            if val_key in history_data:
                plt.plot(history_data[val_key], label=f"{val_key.replace('_', ' ').title()}", marker="o")

            plt.xlabel("Epochs")
            plt.ylabel(train_key.split("_")[-1].title())
            plt.title(train_key.replace("_", " ").title())
            plt.legend()
            plt.grid(True)
            
            annotate_values(plt.gca(), range(len(history_data[train_key])), history_data[train_key])
            if val_key in history_data:
                annotate_values(plt.gca(), range(len(history_data[val_key])), history_data[val_key])
            
            plt.savefig(os.path.join(save_dir, f"{base_filename}_{train_key}.png"))
            plt.close()



In [3]:
def mm_cnn(
    num_date_classes:int ,
    model_id: str=None,
    max_length: int=514,
    dense_units: int=16,
    conv_filters: int=32, 
    conv_kernel_size: int=3, 
):
    """
    Crée un modèle Keras avec un modèle BERT pré-entraîné pour une tâche multitâche de classification.

    Parameters:
    - model_id: str, identifiant du modèle pré-entraîné à utiliser (par exemple, 'bert-base-uncased')
    - max_length: int, la longueur maxiHomme des séquences d'entrée
    - dense_units: int, nombre d'unités pour les couches denses individuelles
    - concat_dense_units: int, nombre d'unités pour la couche dense après concaténation

    Returns:
    - model: Keras Model, le modèle compilé
    """
    # Charger le modèle BERT pré-entraîné
    bert_model = TFAutoModel.from_pretrained(model_id)

    # Définir les entrées du modèle
    input_ids = Input(shape=(max_length,), dtype=tf.int32, name="input_ids")
    attention_mask = Input(shape=(max_length,), dtype=tf.int32, name="attention_mask")

    # Passer les entrées dans le modèle BERT
    bert_output = bert_model(input_ids, attention_mask=attention_mask) 
    sequence_output = bert_output.last_hidden_state  # Shape: (batch_size, sequence_length, hidden_size)  Utiliser ensuite avec un CNN
    # pooled_output = bert_output.pooler_output  # Shape: (batch_size, hidden_size) # Utiliser le pooled_output pour la classification

    # CNN
    conv_layer = Conv1D(filters=conv_filters, kernel_size=conv_kernel_size, activation="relu", name="Conv1D")(sequence_output)
    pooling_layer = MaxPooling1D(pool_size=2, name="MaxPooling1D")(conv_layer)
    flatten_layer = Flatten(name="Flatten")(pooling_layer)
    dropout_layer = Dropout(0.3, name="Dropout")(flatten_layer) # Ajout des couches supplémentaires pour les tâches spécifiques

    # Dense layers for individual tasks
    dense_layer = Dense(units=dense_units, activation="relu", name="Dense")(dropout_layer)

    # Output layers for individual tasks
    sexe_output = Dense(1, activation="sigmoid", name="Sexe_output")(dense_layer)
    date_output = Dense(num_date_classes, activation="softmax", name="Date_output")(dense_layer)

    # Créer le modèle
    model = Model(inputs=[input_ids, attention_mask], outputs=[sexe_output, date_output])

    return model

In [4]:
# 1 : Charger les données d'entraînements et de validation
inputs_and_labels = data.main()

train_inputs = inputs_and_labels["train_inputs"]
train_sexe_labels = inputs_and_labels["train_sexe_labels"]
train_date_labels = inputs_and_labels["train_date_labels"]

val_inputs = inputs_and_labels["val_inputs"]
val_sexe_labels = inputs_and_labels["val_sexe_labels"]
val_date_labels = inputs_and_labels["val_date_labels"]

# Libérer la mémoire occupée par inputs_and_labels
del inputs_and_labels

# Créer le répertoire s'il n'existe pas
os.makedirs(config.MM_CNN_RESULT_PATH, exist_ok=True)

# 1. Intialiser le modèle
model = mm_cnn(num_date_classes=len(config.DATE_MAP), model_id=config.MODEL_ID)

# 2. Compiler le modèle avec les fonctions de perte appropriées pour chaque sortie
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=2e-5),
    loss={
        "Sexe_output": "binary_crossentropy", 
        "Date_output": custom_loss,
    },
    loss_weights={
        "Sexe_output": config.SEXE_LOSS_WEIGHT,
        "Date_output": config.DATE_LOSS_WEIGHT
    },
    metrics={
        "Sexe_output": "accuracy", 
        "Date_output": custom_metric,
    }
)

# Sauvegarder l'architecture du modèle en .png
plot_model(model=model, show_shapes=True, to_file=config.MM_CNN_RESULT_PATH + f"{config.MM_CNN}_model_arch.png")

# Callbacks
checkpoint_callback = ModelCheckpoint(
    filepath=config.MM_CNN_RESULT_PATH + f"{config.MM_CNN}_best_model.h5",
    monitor="val_loss",
    save_best_only=True,
    mode="min", # Sauvegarder le modèle avec la perte minimale
    verbose=1
)

# Early stopping
early_stopping_callback = EarlyStopping(
    monitor="val_loss",
    patience=config.PATIENCE,
    verbose=1, # Affichage d'un message
    restore_best_weights=True # Restaurer les poids du meilleur modèle après l'arrêt
)

# 3. Entraîner le modèle
history = model.fit(
    x=train_inputs,
    y={
        "Sexe_output": train_sexe_labels, 
        "Date_output": train_date_labels,
        },
    epochs=config.EPOCHS,
    batch_size=config.BATCH_SIZE,
    callbacks=[
        checkpoint_callback, 
        early_stopping_callback,
    ],
    validation_data=(
        val_inputs, 
        {
            "Sexe_output": val_sexe_labels, 
            "Date_output": val_date_labels,
        }),
)
    
# Sauvegarder l'historique de l'entraînement en .json
with open(config.MM_CNN_RESULT_PATH + f"{config.MM_CNN}_history.json", "w") as json_file:
    json.dump(history.history, json_file, indent=4)
# Sauvegarder l'historique de l'entraînement en .png
save_training_history(history_data=history.history, save_dir=config.MM_CNN_RESULT_PATH, base_filename=config.MM_CNN, single_figure=True)
save_training_history(history_data=history.history, save_dir=config.MM_CNN_RESULT_PATH, base_filename=config.MM_CNN, single_figure=False)


# Evaluation --------------------------------


# 1. Charger les données de tests
inputs_and_labels = data.main()

test_inputs = inputs_and_labels["test_inputs"]
test_sexe_labels = inputs_and_labels["test_sexe_labels"]
test_date_labels = inputs_and_labels["test_date_labels"]

# Libérer la mémoire occupée par inputs_and_labels
del inputs_and_labels

# Évaluer le modèle sur les données de test avec le GPU
evaluation_results = model.evaluate(
    x = test_inputs,
    y = {
        "Sexe_output": np.array(test_sexe_labels),
        "Date_output": np.array(test_date_labels),
    },
    return_dict=True
)
    
print("\nEvaluation...\n")
pprint.pp(evaluation_results)

# Sauvegarder les résultats de l'évaluation
with open(config.MM_CNN_RESULT_PATH + f"{config.MM_CNN}_evaluation_results.json", "w") as json_file:
    json.dump(evaluation_results, json_file, indent=4)
    
# Charger le modèle pré entraîné
# model = load_model(
#     filepath=config.MM_CNN_RESULT_PATH + f"{config.MM_CNN}_best_model.h5", 
#     custom_objects=custom_objects_dict()
# )

print("\nPrédictions...\n")
prediction_df = prediction(model=model, input_data=test_inputs, sexe_label=test_sexe_labels, date_label=test_date_labels)

evaluate_model(
    df=prediction_df, 
    confusion_matrix_output=config.MM_CNN_RESULT_PATH + f"{config.MM_CNN}_confusion_matrix.png", 
    roc_curve_output=config.MM_CNN_RESULT_PATH + f"{config.MM_CNN}_roc_curve.png"
)

def map_true_date_to_interval(date):
    for _, value in config.DATE_MAP.items():
        start, end = map(int, value.strip("[]()").split(", "))
        if start <= date < end:
            return value
    return None  # Return None if date does not fall into any defined interval

# Apply the mapping function to populate the interval column
prediction_df["interval"] = prediction_df["true date"].apply(map_true_date_to_interval)

save_hist_confusion_matrix(df=prediction_df, output_file=config.MM_CNN_RESULT_PATH + f"{config.MM_CNN}_hist_confusion_matrix.png")

save_accuracy_by_interval_and_gender(df=prediction_df, output_file=config.MM_CNN_RESULT_PATH + f"{config.MM_CNN}_accuracy_by_interval_sexes.png")

Some weights of the PyTorch model were not used when initializing the TF 2.0 model TFCamembertModel: ['lm_head.layer_norm.weight', 'lm_head.layer_norm.bias', 'roberta.embeddings.position_ids', 'lm_head.dense.weight', 'lm_head.dense.bias', 'lm_head.bias']
- This IS expected if you are initializing TFCamembertModel from a PyTorch model trained on another task or with another architecture (e.g. initializing a TFBertForSequenceClassification model from a BertForPreTraining model).
- This IS NOT expected if you are initializing TFCamembertModel from a PyTorch model that you expect to be exactly identical (e.g. initializing a TFBertForSequenceClassification model from a BertForSequenceClassification model).
Some weights or buffers of the TF 2.0 model TFCamembertModel were not initialized from the PyTorch model and are newly initialized: ['roberta.pooler.dense.weight', 'roberta.pooler.dense.bias']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions 

Epoch 1/3
Epoch 1: val_loss improved from inf to 1.39482, saving model to ../results/mm_cnn\mm_cnn_best_model.h5
Epoch 2/3
Epoch 2: val_loss did not improve from 1.39482
Restoring model weights from the end of the best epoch: 1.
Epoch 2: early stopping

Evaluation...

{'loss': 1.4530818462371826,
 'Sexe_output_loss': 0.7265522480010986,
 'Date_output_loss': 7265.296875,
 'Sexe_output_accuracy': 0.5495049357414246,
 'Date_output_custom_metric': 0.1607142835855484}

Prédictions...


Sexe - Accuracy: 0.5495049504950495 
Precision: 0.5566037735849056 
Recall: 0.5728155339805825 
F1 Score: 0.5645933014354068 
AUC: 0.5490340296165539

total
total
total
total
total
total
total
total
total
total
total
total
total
total
total
total


In [17]:
def map_date_to_interval(date):
    if 1825 <= date < 1850:
        return config.DATE_MAP[0]
    elif 1850 <= date < 1875:
        return config.DATE_MAP[1]
    elif 1875 <= date < 1900:
        return config.DATE_MAP[2]
    elif 1900 <= date < 1925:
        return config.DATE_MAP[3]
    elif 1925 <= date < 1950:
        return config.DATE_MAP[4]
    elif 1950 <= date < 1975:
        return config.DATE_MAP[5]
    elif 1975 <= date < 2000:
        return config.DATE_MAP[6]
    elif 2000 <= date < 2024:
        return config.DATE_MAP[7]
    else:
        return float('nan')

# Appliquer la fonction à la colonne 'true date'
prediction_df["interval"] = prediction_df["true date"].apply(map_date_to_interval)


In [13]:
# Function to map true date to interval
def map_true_date_to_interval(date):
    for key, value in config.DATE_MAP.items():
        start, end = map(int, value.strip('[]()').split(', '))
        if start <= date < end:
            return value
    return None  # Return None if date does not fall into any defined interval

# Apply the mapping function to populate the interval column
prediction_df['interval'] = prediction_df['true date'].apply(map_true_date_to_interval)


In [14]:
prediction_df

Unnamed: 0,true sexe,pred sexe,true date,interval
0,1,0,1827,"[1825, 1850)"
1,1,0,1831,"[1825, 1850)"
2,0,0,1832,"[1825, 1850)"
3,0,0,1832,"[1825, 1850)"
4,0,0,1832,"[1825, 1850)"
...,...,...,...,...
197,0,0,2017,"[2000, 2024)"
198,0,1,2017,"[2000, 2024)"
199,0,0,2022,"[2000, 2024)"
200,0,1,2022,"[2000, 2024)"


In [14]:
prediction_df

Unnamed: 0,true sexe,pred sexe,true date,interval
0,1,1,1827,
1,1,1,1831,
2,0,0,1832,
3,0,0,1832,
4,0,0,1832,
...,...,...,...,...
197,0,0,2017,
198,0,0,2017,
199,0,0,2022,
200,0,0,2022,


In [13]:
config.DATE_MAP

{0: '[1825, 1850)',
 1: '[1850, 1875)',
 2: '[1875, 1900)',
 3: '[1900, 1925)',
 4: '[1925, 1950)',
 5: '[1950, 1975)',
 6: '[1975, 2000)',
 7: '[2000, 2024)'}

In [12]:
prediction_df["true date"].map(config.DATE_MAP)

0      NaN
1      NaN
2      NaN
3      NaN
4      NaN
      ... 
197    NaN
198    NaN
199    NaN
200    NaN
201    NaN
Name: true date, Length: 202, dtype: object

In [10]:
prediction_df

Unnamed: 0,true sexe,pred sexe,true date,interval
0,1,1,1827,
1,1,1,1831,
2,0,0,1832,
3,0,0,1832,
4,0,0,1832,
...,...,...,...,...
197,0,0,2017,
198,0,0,2017,
199,0,0,2022,
200,0,0,2022,


In [None]:
import config
import data
import os

from transformers import TFAutoModel
import tensorflow as tf

from keras.models import Model
from keras.utils.vis_utils import plot_model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.layers import (
    Concatenate,
    Conv1D,
    Dense,
    Dropout,
    # Embedding,
    Flatten,
    Input,
    MaxPooling1D,
)

import json
import numpy as np
import pprint


from training_functions import (
    # calculate_estimated_year_tensor,
    custom_loss,
    # custom_objects_dict,
    custom_metric,
    evaluate_model,
    prediction,
    save_accuracy_by_interval_and_gender,
    save_hist_confusion_matrix,
    save_training_history,
)



def ss_cnn(
    model_id: str=None,
    max_length: int=514,
    dense_units: int=16,
    conv_filters: int=32, 
    conv_kernel_size: int=3, 
):
    """
    Crée un modèle Keras avec un modèle BERT pré-entraîné pour une tâche de classification du sexe.

    Parameters:
    - model_id: str, identifiant du modèle pré-entraîné à utiliser (par exemple, 'bert-base-uncased')
    - max_length: int, la longueur maxiHomme des séquences d'entrée
    - dense_units: int, nombre d'unités pour les couches denses
    - conv_filters: int, nombre de filtres pour la couche Conv1D
    - conv_kernel_size: int, taille du noyau pour la couche Conv1D

    Returns:
    - model: Keras Model, le modèle compilé
    """
    # Charger le modèle BERT pré-entraîné
    bert_model = TFAutoModel.from_pretrained(model_id)

    # Définir les entrées du modèle
    input_ids = Input(shape=(max_length,), dtype=tf.int32, name="input_ids")
    attention_mask = Input(shape=(max_length,), dtype=tf.int32, name="attention_mask")

    # Passer les entrées dans le modèle BERT
    bert_output = bert_model(input_ids, attention_mask=attention_mask)
    sequence_output = bert_output.last_hidden_state  # Shape: (batch_size, sequence_length, hidden_size)

    # CNN
    conv_layer = Conv1D(filters=conv_filters, kernel_size=conv_kernel_size, activation="relu", name="Conv1D")(sequence_output)
    pooling_layer = MaxPooling1D(pool_size=2, name="MaxPooling1D")(conv_layer)
    flatten_layer = Flatten(name="Flatten")(pooling_layer)
    dropout_layer = Dropout(0.3, name="Dropout")(flatten_layer)

    # Dense layer for sex classification
    dense_layer_sexe = Dense(units=dense_units, activation="relu", name="Dense_sexe")(dropout_layer)

    # Output layer for sex classification
    sexe_output = Dense(1, activation="sigmoid", name="Sexe_output")(dense_layer_sexe)

    # Créer le modèle
    model = Model(inputs=[input_ids, attention_mask], outputs=sexe_output)

    return model


def main():
    # 1 : Charger les données d'entraînements et de validation
    inputs_and_labels = data.main()

    train_inputs = inputs_and_labels["train_inputs"]
    train_sexe_labels = inputs_and_labels["train_sexe_labels"]
    # train_date_labels = inputs_and_labels["train_date_labels"]

    val_inputs = inputs_and_labels["val_inputs"]
    val_sexe_labels = inputs_and_labels["val_sexe_labels"]
    # val_date_labels = inputs_and_labels["val_date_labels"]

    # Libérer la mémoire occupée par inputs_and_labels
    del inputs_and_labels

    # Créer le répertoire s'il n'existe pas
    os.makedirs(config.SS_CNN_RESULT_PATH, exist_ok=True)

    # 1. Intialiser le modèle
    model = ss_cnn(model_id=config.MODEL_ID)

    # 2. Compiler le modèle avec les fonctions de perte appropriées pour chaque sortie
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=2e-5),
        loss={
            "Sexe_output": "binary_crossentropy", 
            # "Date_output": custom_loss,
        },
        loss_weights={
            "Sexe_output": config.SEXE_LOSS_WEIGHT,
            # "Date_output": config.DATE_LOSS_WEIGHT
        },
        metrics={
            "Sexe_output": "accuracy", 
            # "Date_output": custom_metric,
        }
    )

    # Sauvegarder l'architecture du modèle en .png
    plot_model(model=model, show_shapes=True, to_file=config.SS_CNN_RESULT_PATH + f"{config.SS_CNN}_model_arch.png")

    # Callbacks
    checkpoint_callback = ModelCheckpoint(
        filepath=config.SS_CNN_RESULT_PATH + f"{config.SS_CNN}_best_model.h5",
        monitor="val_loss",
        save_best_only=True,
        mode="min", # Sauvegarder le modèle avec la perte minimale
        verbose=1
    )

    # Early stopping
    early_stopping_callback = EarlyStopping(
        monitor="val_loss",
        patience=config.PATIENCE,
        verbose=1, # Affichage d'un message
        restore_best_weights=True # Restaurer les poids du meilleur modèle après l'arrêt
    )

    # 3. Entraîner le modèle
    print("\nDébut entraînement ss_cnn\n")
    history = model.fit(
        x=train_inputs,
        y={
            "Sexe_output": train_sexe_labels, 
            # "Date_output": train_date_labels,
            },
        epochs=config.EPOCHS,
        batch_size=config.BATCH_SIZE,
        callbacks=[
            checkpoint_callback, 
            early_stopping_callback,
        ],
        validation_data=(
            val_inputs, 
            {
                "Sexe_output": val_sexe_labels, 
                # "Date_output": val_date_labels,
            }),
    )
    print("\Fin entraînement ss_cnn\n")
        
    # Sauvegarder l'historique de l'entraînement en .json
    with open(config.SS_CNN_RESULT_PATH + f"{config.SS_CNN}_history.json", "w") as json_file:
        json.dump(history.history, json_file, indent=4)
    # Sauvegarder l'historique de l'entraînement en .png
    save_training_history(history_data=history.history, save_dir=config.SS_CNN_RESULT_PATH, base_filename=config.SS_CNN, single_figure=True)
    save_training_history(history_data=history.history, save_dir=config.SS_CNN_RESULT_PATH, base_filename=config.SS_CNN, single_figure=False)


    # Evaluation --------------------------------


    # 1. Charger les données de tests
    inputs_and_labels = data.main()

    test_inputs = inputs_and_labels["test_inputs"]
    test_sexe_labels = inputs_and_labels["test_sexe_labels"]
    test_date_labels = inputs_and_labels["test_date_labels"]

    # Libérer la mémoire occupée par inputs_and_labels
    del inputs_and_labels

    # Évaluer le modèle sur les données de test avec le GPU
    evaluation_results = model.evaluate(
        x = test_inputs,
        y = {
            "Sexe_output": np.array(test_sexe_labels),
            # "Date_output": np.array(test_date_labels),
        },
        return_dict=True
    )
        
    print("\nEvaluation...\n")
    pprint.pp(evaluation_results)

    # Sauvegarder les résultats de l'évaluation
    with open(config.SS_CNN_RESULT_PATH + f"{config.SS_CNN}_evaluation_results.json", "w") as json_file:
        json.dump(evaluation_results, json_file, indent=4)
        
    # Charger le modèle pré entraîné
    # model = load_model(
    #     filepath=config.SS_CNN_RESULT_PATH + f"{config.SS_CNN}_best_model.h5", 
    #     custom_objects=custom_objects_dict()
    # )

    print("\nPrédictions...\n")
    prediction_df = prediction(model=model, input_data=test_inputs, sexe_label=test_sexe_labels, date_label=None)

    evaluate_model(
        df=prediction_df, 
        confusion_matrix_output=config.SS_CNN_RESULT_PATH + f"{config.SS_CNN}_confusion_matrix.png", 
        roc_curve_output=config.SS_CNN_RESULT_PATH + f"{config.SS_CNN}_roc_curve.png"
    )

    prediction_df["interval"] = prediction_df["true date"].map(config.DATE_MAP)

    save_hist_confusion_matrix(df=prediction_df, output_file=config.SS_CNN_RESULT_PATH + f"{config.SS_CNN}_hist_confusion_matrix.png")

    save_accuracy_by_interval_and_gender(df=prediction_df, output_file=config.SS_CNN_RESULT_PATH + f"{config.SS_CNN}_accuracy_by_interval_sexes.png")

In [None]:
main()