<a href="https://colab.research.google.com/github/arthurst38/deep_learning/blob/main/Seq2Seq_RNN_Calculator_Keras.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Utilisation des RNNs avec Keras sur des équations mathématiques

## Vérification de l'utilisation de GPU

Allez dans le menu `Exécution > Modifier le type d'execution` et vérifiez que l'on est bien en Python 3 et que l'accélérateur matériel est configuré sur « GPU ».

In [None]:
!nvidia-smi

## Import de TensorFlow et des autres librairies nécessaires

In [None]:
import operator
import random
import typing

import matplotlib.pyplot as plt
import numpy
import seaborn
import sklearn.model_selection
import tensorflow.keras as keras

## Définition du vocabulaire

Dans ces travaux pratiques, nous allons définir des équations, comme `3 + 1 = 4`. Le modèle devra prédire `4` avec comme entrée `3 + 1`.

Pour commencer, créons le vocabulaire :

In [None]:
operations = list("+*-/")
numbers = list("0123456789.")
padding = [" "]

index_to_char = numbers + operations + padding
char_to_index = {c: i for i, c in enumerate(index_to_char)}

print(f"Index vers caractère : {index_to_char}")
print(f"Caractère vers index : {char_to_index}")

## Utilisation du vocabulaire pour encoder et décoder des équations

Nous pouvons maintenant utiliser ce vocabulaire pour transformer des équations textuelles en suite de chiffres, compréhensibles par un réseau de neurones :

In [None]:
def encode(characters: str) -> numpy.ndarray:
  return numpy.array([char_to_index[char] for char in characters])


def decode(array: numpy.ndarray):
  return ''.join(index_to_char[i] for i in array)

*Testez les fonctions `encode` et `decode`. Pensez-vous que le réseau de neurones pourra travailler directement sur la sortie de `encode` ou faudra-t-il appliquer un prétraitement supplémentaire ?*

In [None]:
# Votre code de test ici

Votre réponse ici

### Solution

In [None]:
equation = "3/4+2-5"
result = "0"
print("─" * 50)
print("Équation")
print("─" * 50)
print(f"Forme brute       {equation}")
print(f"Encodage          {encode(equation)}")
print(f"Encodage/décodage {decode(encode(equation))}")
print()
print("─" * 50)
print("Résultat")
print("─" * 50)
print(f"Forme brute       {result}")
print(f"Encodage          {encode(result)}")
print(f"Encodage/décodage {decode(encode(result))}")

Cet encodage est insuffisant pour le traitement par des réseaux de neurones : il faudra au choix one-hot encoder en sus ou passer par une couche d'embeddings.

## Génération de données

Nous pouvons maintenant procéder à la génération d'exemples, qui serviront pour l'entraînement, la validation et le test.

Pour faire cela, nous allons sélectionner aléatoirement une opération parmi les 4 définies et générer des entiers aléatoires pour appliquer cette opération. Ce processus sera répété jusqu'à atteindre le nombre souhaité d'exemples.

In [None]:
def make_maths_problem(n_samples=1000,
                       n_digits=3,
                       invert=True):
  equations = []
  results = []
  
  math_operation = {"+": operator.add, 
                    "-": operator.sub,
                    "*": operator.mul,
                    "/": operator.truediv}

  # Taille maximale que peut faire la chaîne décrivant l'opération
  max_equation_len = 2 * n_digits + 1
  max_result_len = 2 * n_digits

  while len(equations) < n_samples:
    # Sélection d'une opération aléatoire
    operation = numpy.random.choice(operations)

    # Génération de deux entiers qui respectent la limite de taille
    left, right = numpy.random.randint(10 ** n_digits, size=2)

    equation = f"{left}{operation}{right}"

    if equation not in equations:
      # Calcul du résultat. Étant donné que left et right sont des entiers
      # numpy, ce calcul ne cause pas d'exception, même en cas de division par 0
      math_result = math_operation[operation](left, right)

      # On recommence si le résultat n'est pas exploitable
      if math_result == numpy.inf or numpy.isnan(math_result):
        continue

      # Le résultat peut-être très grand (0.3333333…), on limite sa taille
      result = str(math_result)[:max_result_len]

      # On « pad » pour que toutes les séquences fassent la même taille
      padded_equation = equation.ljust(max_equation_len)
      padded_result = result.ljust(max_result_len)

      # On inverse si l'argument invert est donné
      if invert:
        padded_equation = padded_equation[::-1]

      equations.append(padded_equation)
      results.append(padded_result)
  
  X = numpy.array(list(map(encode, equations)))
  y = numpy.array(list(map(encode, results)))
  return X, y

Testons cette fonction avec une dizaine d'exemples :

In [None]:
X, y = make_maths_problem(10, n_digits=3)

print(f"Forme de X : {X.shape}")
print(f"Forme de y : {y.shape}")

print()
print("Quelques exemples générés")
for encoded_equation, encoded_result in zip(X, y):
    # Par défault l'inversion des équations est activée, il faut la défaire
    # pour pouvoir visualiser l'équation originale
    equation = decode(encoded_equation[::-1])
    result = decode(encoded_result)
    print(f"{equation} = {result}")

Nous pouvons maintenant créer le dataset et les splits nécessaires :

In [None]:
X, y = make_maths_problem(100_000, n_digits=3)
X_train, X_test, y_train, y_test = sklearn.model_selection.train_test_split(
    X, y, train_size=0.8)

## Définition du modèle

Pour traiter ces séquences de caractères, nous allons utiliser un RNN. Comme dit auparavant, il sera nécessaire de one-hot encoder les séquences d'entrée ou de les passer par une couche d'embeddings. Ici, nous utiliserons la couche d'embeddings.

In [None]:
def rnn_model(hidden_size=1024, n_digits=3):
  model = keras.models.Sequential()
  # Encodeur
  model.add(keras.layers.Embedding(len(index_to_char), 16))
  model.add(keras.layers.GRU(hidden_size, return_sequences=True))
  model.add(keras.layers.GRU(hidden_size))
  
  # Décodeur
  # La couche RepeatVector permet de dupliquer la sortie de l'encodeur autant de
  # fois que l'on souhaite de caractères d'output. C'est un moyen simple de
  # conditionner le décodage du résultat sur l'encodage de l'équation
  model.add(keras.layers.RepeatVector(2 * n_digits))
  # Notez bien l'argument return_sequences=True, sans celui-ci nous ne
  # produirions qu'une seule sortie
  model.add(keras.layers.GRU(hidden_size, return_sequences=True))

  # On applique une couche de sortie dense à chaque timestep
  model.add(keras.layers.TimeDistributed(
      keras.layers.Dense(len(index_to_char), activation="softmax")))

  model.compile(loss="sparse_categorical_crossentropy",
                optimizer="adam",
                metrics=["accuracy"])
  return model


model = rnn_model()

## Entraînement du modèle

On affichera régulièrement des prédictions sur des exemples stables pour voir l'évolution du modèle.

In [None]:
# On utilisera les mêmes exemples à chaque évaluation pour voir le modèle
# progresser
n_test = 20
indexes = numpy.random.choice(X_test.shape[0], replace=False, size=n_test)
X_examples, y_examples = X_test[indexes], y_test[indexes]

In [None]:
def examples() -> None:
  guesses = numpy.argmax(model.predict(X_examples, verbose=0), axis=-1)
  for encoded_equation, encoded_result, encoded_guess in zip(
      X_examples, y_examples, guesses):
    equation = decode(encoded_equation[::-1])
    result = decode(encoded_result)
    guess = decode(encoded_guess)
    print(f"{equation} = {result} ?= {guess}")

for i in range(10):
  model.fit(X_train,
            y_train,
            batch_size=4096,
            epochs=10,
            verbose=True,
            validation_data=(X_test, y_test))
  print()
  print("─" * 50)
  print(f"Après {(i + 1) * 10} epochs :")
  examples()

## Évaluation du modèle

Pour évaluer notre modèle, nous allons calculer 3 éléments sur l'ensemble de test :

- les prédictions illégales (quand la sortie du modèle ne peut pas être interprétée comme un flottant)
- l'accuracy
- l'erreur fractionnelle ($\frac{y_{pred} - y}{y}$)

In [None]:
def evaluate() -> None:
  # Calcul des prédictions du modèle
  predictions = numpy.argmax(model.predict(X_test), axis=-1)

  # Décodage des prédictions du modèle en chaîne de caractères
  str_predictions = numpy.apply_along_axis(decode, axis=1, arr=predictions)

  # Décodage des prédictions du modèle en flottants
  # Parfois la chaîne de caractères émise ne représente pas un flottant valide
  # On crée un masque booléen pour pouvoir filtrer ces éléments à posteriori
  illegal_predictions_mask = numpy.zeros(X_test.shape[0], numpy.bool)
  float_predictions = []
  for i, str_prediction in enumerate(str_predictions):
    try:
      float_prediction = float(str_prediction)
      float_predictions.append(float_prediction)
    except ValueError:
      illegal_predictions_mask[i] = True
  illegal_ratio = illegal_predictions_mask.sum() / X_test.shape[0]
  print(f"Pourcentage de prédictions illégales : {illegal_ratio * 100}%")

  y_pred = numpy.array(float_predictions, numpy.float32)
  y = numpy.apply_along_axis(decode, 1, y_test).astype(numpy.float32)
  y = y[~illegal_predictions_mask]

  accuracy = (y == y_pred).sum() / X_test.shape[0]
  print(f"Accuracy : {accuracy:.2f}")

  y_denominator = numpy.copy(y)
  y_denominator[y == 0] = 1

  fractional_difference = (y_pred - y) / y_denominator

  print(f"Moyenne de l'erreur fractionnelle : {fractional_difference.mean()}")

  plt.hist(fractional_difference, bins=1000)
  plt.title("Vue globale de la distribution des erreurs fractionnelles")
  plt.xlabel("Erreur fractionnelle")
  plt.ylabel("Décompte d'exemples")
  plt.xscale("symlog")
  plt.yscale("log")
  plt.show()

  plt.hist(fractional_difference, bins=20, range=(-0.1, 0.1))
  plt.title("Vue zoomée sur 0 de la distribution des erreurs fractionnelles")
  plt.xlabel("Erreur fractionnelle")
  plt.ylabel("Décompte d'exemples")
  plt.show()


evaluate()