## Load ressources

In [None]:
import pandas as pd

train_df = pd.read_csv("../data/image_classification/split_train.csv")
val_df = pd.read_csv("../data/image_classification/split_val.csv")
test_df = pd.read_csv("../data/image_classification/split_test.csv")

train_df.shape, val_df.shape, test_df.shape

In [None]:
from tensorflow.keras.models import load_model, Model

# Load the text and image model
text_model = load_model("path_to_text_model.h5")
image_model = load_model("path_to_image_model.h5")

print("Text model and image model loaded successfully.")

In [None]:
import tensorflow as tf


# Function to load images
def load_image(image_path):
    image = tf.io.read_file(image_path)
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [224, 224])  # Resize for model input
    image = image / 255.0  # normalize
    return image


def process_labels(labels):
    return tf.convert_to_tensor(labels, dtype=tf.float32)

In [None]:
import numpy as np

labels_array = np.array(train_df.iloc[:, 3:].values)

class_counts = labels_array.sum(axis=0)  # Count samples per class
class_weights = {
    i: len(labels_array) / (len(class_counts) * class_counts[i])
    for i in range(len(class_counts))
}


def compute_sample_weights(labels, class_weights):
    return np.array(
        [
            sum(class_weights[i] * label[i] for i in range(len(label)))
            for label in labels
        ]
    )


sample_weights = compute_sample_weights(labels_array, class_weights)

In [None]:
import nltk

nltk.download("punkt")
nltk.download("stopwords")
nltk.download("wordnet")
nltk.download("punkt_tab")

In [None]:
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk.stem import WordNetLemmatizer
import re


def preprocess_text(text):
    # normalisation
    text = text.lower()

    text = re.sub(r"[^a-z\s]", "", text)

    # tokenization
    tokens = word_tokenize(text)

    # suppression des stop words english
    stop_words = set(stopwords.words("english"))
    tokens = [word for word in tokens if word not in stop_words]

    # Lemmatisation
    lemmatizer = WordNetLemmatizer()
    tokens = [lemmatizer.lemmatize(word) for word in tokens]

    return " ".join(tokens)

In [None]:
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

data = pd.read_csv("../data/image_classification/train.csv", on_bad_lines="skip")
data["Processed_Caption"] = data["Caption"].apply(preprocess_text)

train_df["Processed_Caption"] = train_df["Caption"].apply(preprocess_text)
val_df["Processed_Caption"] = val_df["Caption"].apply(preprocess_text)
test_df["Processed_Caption"] = test_df["Caption"].apply(preprocess_text)

max_sequence_length = 100
tokenizer = Tokenizer()
tokenizer.fit_on_texts(data["Processed_Caption"])
word_index = tokenizer.word_index
print(f"Vocabulaire: {len(word_index)} mots uniques")


def tokenize_text(text):
    sequences = tokenizer.texts_to_sequences([text])
    return pad_sequences(sequences, maxlen=max_sequence_length, padding="post")[0]

In [None]:
AUTOTUNE = tf.data.AUTOTUNE
train_ds = (
    tf.data.Dataset.zip(
        (
            tf.data.Dataset.from_tensor_slices(train_df["Processed_Caption"]).map(
                tokenize_text
            ),
            tf.data.Dataset.from_tensor_slices(train_df["ImageURL"]).map(load_image),
            tf.data.Dataset.from_tensor_slices(
                process_labels(train_df.iloc[:, 3:].values)
            ),
            tf.data.Dataset.from_tensor_slices(
                sample_weights
            ),  # Add sample weights to dataset
        )
    )
    .batch(32)
    .prefetch(1)
)

val_ds = (
    tf.data.Dataset.zip(
        (
            tf.data.Dataset.from_tensor_slices(val_df["Processed_Caption"]).map(
                tokenize_text
            ),
            tf.data.Dataset.from_tensor_slices(val_df["ImageURL"]).map(load_image),
            tf.data.Dataset.from_tensor_slices(
                process_labels(val_df.iloc[:, 3:].values)
            ),  # Assuming labels are in columns 1 to n
        )
    )
    .batch(32)
    .prefetch(1)
)

test_ds = (
    tf.data.Dataset.zip(
        (
            tf.data.Dataset.from_tensor_slices(test_df["Processed_Caption"]).map(
                tokenize_text
            ),
            tf.data.Dataset.from_tensor_slices(test_df["ImageURL"]).map(load_image),
            tf.data.Dataset.from_tensor_slices(
                process_labels(test_df.iloc[:, 3:].values)
            ),  # Assuming labels are in columns 1 to n
        )
    )
    .batch(32)
    .prefetch(1)
)

## Fusion

In [None]:
# Extraction des caractéristiques textuelles
text_features = Model(
    inputs=text_model.input, outputs=text_model.get_layer(index=-4).output
)

# Extraction des caractéristiques visuelles
visual_features = Model(
    inputs=image_model.input, outputs=image_model.get_layer(index=-4).output
)


print("Caractéristiques textuelles :", text_features)
print("Caractéristiques visuelles :", visual_features)

In [None]:
# Fusionner les caractéristiques textuelles et visuelles
def concatenate_features(text_features, visual_features):
    return tf.concat([text_features, visual_features], axis=-1)


# Exemple d'utilisation
# Supposons que `text_features_output` et `visual_features_output` soient les sorties des modèles respectifs
text_features_output = text_features.predict(train_ds.map(lambda x, y, z, w: x))
visual_features_output = visual_features.predict(train_ds.map(lambda x, y, z, w: y))

common_representation = concatenate_features(
    text_features_output, visual_features_output
)
print("Représentation commune des données créée : ", common_representation.shape)

In [None]:
from tensorflow.keras.layers import Dense, Dropout


# Ajouter des couches denses pour la classification multi-label
def build_classification_head(input_features, num_classes):
    x = Dense(256, activation="relu")(input_features)
    x = Dropout(0.5)(x)
    output = Dense(num_classes, activation="sigmoid")(x)
    return output


num_classes = train_df.iloc[:, 3:].shape[
    1
]  # Nombre de classes basé sur les colonnes des étiquettes
classification_output = build_classification_head(common_representation, num_classes)

print("Classification head ajoutée avec succès.")

## Entraînement conjoint

In [None]:
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input

# Define inputs for text and image
text_input = Input(shape=(max_sequence_length,), name="text_input")
image_input = Input(shape=(224, 224, 3), name="image_input")

# Extract features using pre-trained models
text_features_output = text_features(text_input)
visual_features_output = visual_features(image_input)

# Concatenate features
combined_features = concatenate_features(text_features_output, visual_features_output)

# Add classification head
classification_output = build_classification_head(combined_features, num_classes)

# Define the final model
combined_model = Model(inputs=[text_input, image_input], outputs=classification_output)

# Compile the model
combined_model.compile(
    optimizer="adam",
    loss="binary_crossentropy",
    metrics=[
        "accuracy",
        tf.keras.metrics.AUC(
            multi_label=True,
            num_thresholds=200,
            curve="ROC",
            summation_method="interpolation",
        ),
    ],
)

combined_model.summary()

In [None]:
# Train the model

callbacks = [
    tf.keras.callbacks.EarlyStopping(
        monitor="val_loss", patience=3, restore_best_weights=True
    ),
    # tf.keras.callbacks.ModelCheckpoint('best_model.h5', save_best_only=True, monitor='val_loss')
]

history = combined_model.fit(
    train_ds.map(lambda x, y, z, w: ((x, y), z), num_parallel_calls=AUTOTUNE),
    validation_data=val_ds.map(
        lambda x, y, z: ((x, y), z), num_parallel_calls=AUTOTUNE
    ),
    epochs=5,
    class_weight=class_weights,
    callbacks=callbacks,
)

print("Evaluation du modèle sur le jeu de validation :")
combined_model.evaluate(
    val_ds.map(lambda x, y, z: ((x, y), z), num_parallel_calls=AUTOTUNE), verbose=1
)

In [None]:
def multi_label_threshold(y_pred, threshold=0.5, threshold_nb=None):
    if threshold_nb is not None:
        y_pred_top3 = np.zeros_like(y_pred, dtype=int)
        for i in range(len(y_pred)):
            top_indices = np.argsort(y_pred[i])[-threshold_nb:]
            y_pred_top3[i, top_indices] = 1

        return y_pred_top3

    return (y_pred > threshold).astype(int)

In [None]:
# Exemple avec un échantillon de test
import matplotlib.pyplot as plt

nb_test = 3

for nb in nb_test:
    sample = test_ds.take(1).as_numpy_iterator()
    text, image, labels = next(sample)

    plt.imshow(image[0])  # Display the first image in the batch
    plt.axis("off")
    plt.show()

    print(f"Texte : {text[0]}")

    predictions = combined_model.predict(((text, image), image))
    top3_pred = multi_label_threshold(predictions, threshold_nb=3)

    print(f"Prédictions : {top3_pred}")
    print(f"Confiance : {predictions}")
    print(f"Vrais labels : {labels}")