In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf

import matplotlib.pyplot as plt

# BILSTM

## Chargement des Données

In [None]:
# Données d'entrainement
train_data_complete = pd.read_csv("../data/allocine_genres_train.csv", sep=",")
train_data = train_data_complete[["titre", "synopsis", "genre"]]

# Données de test/validation
test_data_complete = pd.read_csv("../data/allocine_genres_test.csv", sep=",")
test_data = test_data_complete[["titre", "synopsis", "genre"]]

Lister les classes et leur associer un identifiant unique. (Utile pour le plongement des mots et pour l'entraînement du CNN).

In [None]:
# Liste des genres
genre_name = sorted(train_data.genre.unique().flatten())
print("Genres:", genre_name)
print("Nombre d'exemplaires:", len(train_data))

# Identifiant unique par genre
genre_index = {genre_name[i]:i for i in range(len(genre_name))}
genre_index

Remplacer les genres par la valeur numérique associée.

In [None]:
train_data = train_data.replace({"genre": genre_index})
train_data.head()

In [None]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(train_data[["titre", "synopsis"]],
                                                    train_data[["genre"]],
                                                    test_size=0.001,
                                                    random_state=12, # Random seed for shuffle
                                                    shuffle=True)

On combine le titre et le synopsis pour pouvoir les vectoriser par la suite.

In [None]:
X_train_titre = X_train.titre
X_train = X_train_titre + " " + X_train.synopsis

In [None]:
X_train.head()

In [None]:
y_train.head()

In [None]:
print(X_train.shape)
print(y_train.shape)

## Indexation du Vocabulaire

In [None]:
def get_vectorizer(documents, max_voc_size=8000, max_seq_length= 200, batch_size=64):
	vectorizer = tf.keras.layers.TextVectorization(max_tokens=max_voc_size, output_sequence_length=max_seq_length)
	# Création du jeu de données à partir de X_train et constitution de lots de 128 instances
	text_ds = tf.data.Dataset.from_tensor_slices(documents).batch(batch_size)
	# Création du vocabulaire à partir des données d'entrée
	vectorizer.adapt(text_ds)
	return vectorizer

In [None]:
vectorizer = get_vectorizer(X_train)

In [None]:
voc = vectorizer.get_vocabulary()
print(len(voc))

In [None]:
voc[:10]

In [None]:
word_index = dict(zip(voc, range(len(voc))))

In [None]:
print("Texte initial:", X_train.iloc[1])
output = vectorizer([X_train.iloc[1]])
print("Vocabulaire dans le texte (15 premiers items):")
for v in output.numpy()[0, :15]:
    print(v, vectorizer.get_vocabulary()[v])

## Chargement de Plongements de Mots Pré-entraînés

In [None]:
from gensim.models import KeyedVectors
model = KeyedVectors.load_word2vec_format("../embedding/frWiki_no_phrase_no_postag_700_cbow_cut100.bin", binary=True, unicode_errors="ignore")
model.most_similar("bonjour")

In [None]:
model.key_to_index

In [None]:
model["bonjour"]

Le plongement pré-entrainé est de dimension 700.

In [None]:
len(model["bonjour"])

In [None]:
def load_embeddings(embeddings_model):
    embeddings_index = {}
    for word in list(embeddings_model.key_to_index.keys()):
        embeddings_index[word] = embeddings_model[word]
    print(f'{len(embeddings_index)} vecteurs de mots ont été lus')
    return embeddings_index

In [None]:
embeddings = load_embeddings(model)

La cellule suivante permet de créer une matrice de plongements: une matrice où la ligne i correspond au plongement pré-entraîné pour le mot d'indice i dans le vocabulaire.

In [None]:
def get_embedding_matrix(vocabulary, embeddings_index, embedding_dim = 700):
  num_tokens = len(vocabulary)
  hits = 0
  misses = 0

  # Préparation de la matrice
  # Les mots qui ne se trouvent pas dans les plongements pré-entraînés seront 
  # représentés par des vecteurs dont toutes les composantes sont égales à 0,
  # y compris la représentation utilisée pour compléter les documents courts et
  # celle utilisée pour les mots inconnus [UNK]
  embedding_matrix = np.zeros((num_tokens, embedding_dim))
  for word, i in word_index.items():
      embedding_vector = embeddings_index.get(word)
      if embedding_vector is not None:
          embedding_matrix[i] = embedding_vector
          hits += 1
      else:
          misses += 1
  print(f'{hits} mots ont été trouvés dans les plongements pré-entraînés')
  print(f'{misses} sont absents')
  return embedding_matrix

In [None]:
# Construction de la matrice de plongements à partir du vocabulaire
embedding_matrix = get_embedding_matrix(voc, embeddings)

## Construction du LSTM

In [None]:
from tensorflow.keras.layers import Embedding

def get_biLSTM_model(voc_size, embedding_matrix, embedding_dim=700):
  # Création du modèle
  int_sequences_input = tf.keras.Input(shape=(None,), dtype="int64")
  embedding_layer = Embedding(voc_size, embedding_dim, trainable=True,
      embeddings_initializer=tf.keras.initializers.Constant(embedding_matrix),
  )
  
  embedded_sequences = embedding_layer(int_sequences_input)
  x = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(64, dropout=0.2, recurrent_dropout=0.2))(embedded_sequences)
  preds = tf.keras.layers.Dense(len(genre_name), activation="softmax")(x)
  model = tf.keras.Model(int_sequences_input, preds)
  return model

In [None]:
# Affichage de l'architecture du modèle
biLSTM_model = get_biLSTM_model(len(voc), embedding_matrix)
biLSTM_model.summary()

## Entraînement du LSTM

In [None]:
from sklearn import model_selection
from tensorflow.keras.callbacks import EarlyStopping # Early stopping to avoid over-fitting

# Fonction pour l'entraînement d'un modèle
def train_model(X, y, model_function, vectorizer, voc_size, embedding_matrix, embedding_dim=700, batch_size=32): # 128
    
    # Early stopping callback
    early_stopping_callback = EarlyStopping(monitor="val_acc", restore_best_weights=True, patience=3)
    
    # Listes utilisées pour sauvegarder les résultats obtenus à chaque pli
    acc_per_fold = []
    loss_per_fold = []
    histories = []
    folds = 3 # 5
    stratkfold = model_selection.StratifiedKFold(n_splits=folds, shuffle=True, random_state=12)
    fold_no = 1
    for train, test in stratkfold.split(X, y):
        m_function = globals()[model_function]
        model = m_function(voc_size, embedding_matrix, embedding_dim)

        print('------------------------------------------------------------------------')
        print(f'Entraînement pour le pli {fold_no} ...')
        fold_x_train = vectorizer(X.iloc[train].to_numpy()).numpy()
        fold_x_val = vectorizer(X.iloc[test].to_numpy()).numpy()
        fold_y_train = y.iloc[train].to_numpy()
        fold_y_val = y.iloc[test].to_numpy()

        # Compilation du modèle : permet de préciser la fonction de perte et l'optimiseur
        # loss=sparse_categorical_crossentropy : entropie croisée, dans le cas où les 
        # classes cibles sont indiquées sous forme d'entiers. Il s'agira de minimiser
        # la perte pendant l'apprentissage
        # optimizer=rmsprop : l'optimiseur détermine la manière doit les poids seront
        # mis à jour pendant l'apprentissage
        model.compile(loss="sparse_categorical_crossentropy", optimizer="rmsprop", metrics=["acc"])
        # Entraînement sur 10 époque (la totalité du jeu de données est parcourue
        # 10 fois)
        history = model.fit(fold_x_train, fold_y_train, batch_size=batch_size, epochs=25, validation_data=(fold_x_val, fold_y_val), callbacks=[early_stopping_callback])
        histories.append(history)
        # Evaluation sur les données de validation
        scores = model.evaluate(fold_x_val, fold_y_val, verbose=0)
        print(f'Scores pour le pli {fold_no}: {model.metrics_names[0]} = {scores[0]:.2f};',
            f'{model.metrics_names[1]} = {scores[1]*100:.2f}%')
        acc_per_fold.append(scores[1] * 100)
        loss_per_fold.append(scores[0])
        fold_no = fold_no + 1

    # Affichage des scores moyens par pli
    print('---------------------------------------------------------------------')
    print('Scores par pli')
    for i in range(0, len(acc_per_fold)):
        print('---------------------------------------------------------------------')
        print(f'> Pli {i+1} - Loss: {loss_per_fold[i]:.2f}', f'- Accuracy: {acc_per_fold[i]:.2f}%')
    print('---------------------------------------------------------------------')
    print('Scores moyens pour tous les plis :')
    print(f'> Accuracy: {np.mean(acc_per_fold):.2f}', f'(+- {np.std(acc_per_fold):.2f})')
    print(f'> Loss: {np.mean(loss_per_fold):.2f}')
    print('---------------------------------------------------------------------')
    return histories

In [None]:
# Entraînement du modèle et récupération des résultats
biLSTM_histories = train_model(X_train, y_train, 'get_biLSTM_model', vectorizer, len(voc), embedding_matrix)

In [None]:
import seaborn as sns
sns.set_theme(style="darkgrid")

def plot_results(histories):
    accuracy_data = []
    loss_data = []
    for i, h in enumerate(histories):
        acc = h.history['acc']
        val_acc = h.history['val_acc']
        loss = h.history['loss']
        val_loss = h.history['val_loss']
        for j in range(len(acc)):
            accuracy_data.append([i+1, j+1, acc[j], 'Entraînement'])
            accuracy_data.append([i+1, j+1, val_acc[j], 'Validation'])
            loss_data.append([i+1, j+1, loss[j], 'Entraînement'])
            loss_data.append([i+1, j+1, val_loss[j], 'Validation'])

    acc_df = pd.DataFrame(accuracy_data, columns=['Pli', 'Epoch', 'Accuracy', 'Données'])
    sns.relplot(data=acc_df, x='Epoch', y='Accuracy', hue='Pli', style='Données', kind='line')
    
    loss_df = pd.DataFrame(loss_data, columns=['Pli', 'Epoch', 'Perte', 'Données'])
    sns.relplot(data=loss_df, x='Epoch', y='Perte', hue='Pli', style='Données', kind='line')

In [None]:
plot_results(CNN_histories)