# CNNs avec Keras sur des données de paysage

## Vérification de l'utilisation de GPU

Allez dans le menu `Exécution > Modifier le type d'execution` et vérifiez que l'on est bien en Python 3 et que l'accélérateur matériel est configuré sur « GPU ».

In [None]:
!nvidia-smi

## Téléchargement du dataset Landscape depuis un repo git

In [None]:
!git clone https://github.com/nzmonzmp/dataset-landscape.git
!ls dataset-landscape
print("***")
!ls -l dataset-landscape/seg_train
print("***")
!ls -l dataset-landscape/seg_pred

## Import de TensorFlow et des autres librairies nécessaires

In [None]:
import itertools
import os
import pathlib
import random
import typing

import cv2
import matplotlib.pyplot as plt
import numpy
import pandas
import seaborn
import sklearn.utils
import sklearn.metrics
import tensorflow.keras as keras

## Préparation des données

Pour charger nos données, nous allons combiner plusieurs libraires : [OpenCV](https://opencv.org/), [NumPy](https://numpy.org/) et [scikit-learn](https://scikit-learn.org/stable/). Ces librairies seront appelées depuis la fonction `get_images`.

Après avoir chargé chaque image, nous allons passer leur canaux en RGB puis les redimensionner à 150x150, enfin, par défaut, nous retournerons un dataset mélangé grâce à [`sklearn.utils.shuffle`](https://scikit-learn.org/stable/modules/generated/sklearn.utils.shuffle.html).

*Complétez la fonction `get_images` qui va chercher les images dans `dir_path` contenant un dossier par classe. Chaque dossier de classe contient l'ensemble des images de cette classe. Il vous faut attribuer le label correct à chaque image.*

## Solution

In [None]:
label_names = ["buildings", "forest", "glacier", "mountain", "sea", "street"]
label_to_index = {l: i for i, l in enumerate(label_names)}


def get_images(dir_path: pathlib.Path, shuffle: bool = True
              ) -> typing.Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray]:
  images = []
  labels = []
  file_paths  = []

  # On itère sur les sous-dossier de la racine : ils correspondent chacun à une
  # classe
  for subdir_path in dir_path.iterdir():

    # Attribuez le bon label en fonction du nom du dossier "labels"
    # Votre code ici
    label = label_to_index.get(subdir_path.name)

    # On ajoute chaque image du label (dossier) courant à notre dataset
    for image_path in subdir_path.iterdir():
      # Utilisation de OpenCV pour charger l'image
      image = cv2.imread(str(image_path))
      image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
      # En entrée d'un CNN, toutes les images doivent faire la même taille
      image = cv2.resize(image, (150, 150))
      images.append(image)
      labels.append(label)
      file_paths.append(image_path)
  images, labels, file_paths = map(numpy.array, [images, labels, file_paths])

  # Mélange de ces tableaux
  if shuffle:
    images, labels, file_paths = sklearn.utils.shuffle(images,
                                                       labels,
                                                       file_paths)
  return images, labels, file_paths


# get_images(pathlib.Path("dataset-landscape") / "seg_train")

## Appel à `get_images`

In [None]:
images, labels, file_paths = get_images(
    pathlib.Path("dataset-landscape") / "seg_train")

In [None]:
print(f"Forme des images : {images.shape}")
print(f"Forme des labels : {labels.shape}")
print(f"Forme des chemins : {file_paths.shape}")

seaborn.countplot(x=labels)
plt.title("Décomptes des différents labels")
plt.ylabel("Décompte")
plt.xlabel("Label")
plt.show()

In [None]:
# Création de la grille de sous-plots. On donne l'argument figsize pour agrandir
# la taille de la figure qui est petite par défaut
f, ax = plt.subplots(5, 5, figsize=(15, 15))

# On choisit 25 indices au hasard, sans replacement (on ne veut pas afficher la
# même image deux fois)
random_indexes = numpy.random.choice(images.shape[0],
                                     size=(5, 5),
                                     replace=False)

for i in range(5):
  for j in range(5):
    img_index = random_indexes[i, j]
    image = images[img_index]
    label = label_names[labels[img_index]]

    # Affichage avec matplotlib et sa fonction imshow, très pratique en vision par
    # ordinateur
    ax[i, j].imshow(image)
    ax[i, j].set_title(f"Exemple {img_index} ({label})")
    ax[i, j].axis('off')

## Création du modèle

Voici un exemple de CNN « minimaliste »

In [None]:
# Initialisation et définition du modéle

# Le modèle est un empilement de couches où le flux de données est séquentiel
model = keras.models.Sequential()
# Une première couche de 1 convolutions de 3x3 pixels
model.add(keras.layers.Conv2D(1,
                              kernel_size=(3, 3),
                              activation="relu",
                              input_shape=(150, 150, 3)))
# Une couche de max pooling
model.add(keras.layers.MaxPool2D(3,3))

# Une couche de manipulation des tenseurs : suppression de toutes les dimensions
# sauf celle de batch et une autre qui contient toutes les valeurs
model.add(keras.layers.Flatten())

# Une couche de sortie dense avec 6 neurones et un softmax comme activation
model.add(keras.layers.Dense(6, activation="softmax"))

# Compilation du modèle avec la définition de la fonction de perte
model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0001),
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])

# Affichage d'un résumé du modèle
model.summary()

## Pouvez-vous expliquer les différents nombres de paramètres ?

### Solution


Premier layer de 1 convolutions : (taille du kernel) * (nb kernel) * (nb canaux en entrée) + (nb biais (= nb kernel)) = (3 * 3 ) * 1 * 3 + 1

Dernier layer dense : (input dim) * (output dim) + (nb biais) = 2401 * 6 + 6



## Apprentissage

Apprenons ce modèle sur nos données ! Dans un premier temps, nous entraînons sur une seule epoch pour simplement vérifier que notre modèle est opérationnel.

In [None]:
# Apprentissage du modèle
training_history = model.fit(images, labels, epochs=1, validation_split=0.30)

## Améliorez cette performance

Inspirez-vous du modèle précédent en rajoutant des couches, en faisant des couches plus petites ou plus grosses.

Visez entre 10 et 20 itérations et mois de 1 minute par itération (pour des raisons évidentes).

On peut considérer l'utilisation d'une couche de dropout juste avant la dernière couche dense pour améliorer la régularisation.

On peut obtenir une précision supérieure à 70% sur la base de validation en un temps raisonnable.

La solution proposée prend $\approx$ 45 secondes par itération pendant 15 itérations et atteint aux alentour de 85% d'accuracy sur la base de validation.

In [None]:
# Vos améliorations ici
model = keras.models.Sequential()
model.add(keras.layers.Conv2D(10,
                              kernel_size=(3, 3),
                              activation="relu",
                              input_shape=(150, 150, 3)))
model.add(keras.layers.MaxPool2D(3,3))
model.add(keras.layers.Flatten())
model.add(keras.layers.Dense(6, activation="softmax"))
model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0001),
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])

# Affichage d'un résumé du modèle
model.summary()

# Apprentissage du modèle
training = model.fit(images, labels, epochs=10, validation_split=0.30)


# Plot des métriques d'entraînement
def plot_metrics(history) -> None:
  plt.plot(training.history["accuracy"])
  plt.plot(training.history["val_accuracy"])
  plt.title("Accuracy du modèle")
  plt.ylabel("Accuracy")
  plt.xlabel("Epoch")
  plt.legend(["Entraînement", "Validation"], loc="upper left")
  plt.show()

  plt.plot(training.history["loss"])
  plt.plot(training.history["val_loss"])
  plt.title("Perte du modèle")
  plt.ylabel("Perte")
  plt.xlabel("Epoch")
  plt.legend(["Entraînement", "Validation"], loc="upper right")
  plt.show()


plot_metrics(training.history)

### Solution

In [None]:
conv2d_params = dict(kernel_size=(3,3),
                     activation="relu",
                     kernel_initializer="orthogonal",
                     padding="same")

dense_params = dict(activation="relu", kernel_initializer="orthogonal")

model = keras.models.Sequential()
model.add(keras.layers.Conv2D(200, input_shape=(150, 150, 3), **conv2d_params))
model.add(keras.layers.MaxPool2D(2, 2, padding="same"))
model.add(keras.layers.Conv2D(200, **conv2d_params))
model.add(keras.layers.MaxPool2D(2, 2, padding="same"))
model.add(keras.layers.Conv2D(200, **conv2d_params))
model.add(keras.layers.MaxPool2D(2, 2, padding="same"))
model.add(keras.layers.Conv2D(200, **conv2d_params))
model.add(keras.layers.MaxPool2D(2, 2, padding="same"))
model.add(keras.layers.Conv2D(200, **conv2d_params))
model.add(keras.layers.MaxPool2D(2, 2, padding="same"))
model.add(keras.layers.Conv2D(200, **conv2d_params))
model.add(keras.layers.MaxPool2D(2, 2, padding="same"))
model.add(keras.layers.Conv2D(200, **conv2d_params))
model.add(keras.layers.Flatten())
model.add(keras.layers.Dropout(rate=0.2))
model.add(keras.layers.Dense(200, **dense_params))
model.add(keras.layers.Dropout(rate=0.2))
model.add(keras.layers.Dense(100, **dense_params))
model.add(keras.layers.Dropout(rate=0.2))
model.add(keras.layers.Dense(50, **dense_params))
model.add(keras.layers.Dropout(rate=0.2))
model.add(keras.layers.Dense(6,
                             activation="softmax",
                             kernel_initializer="orthogonal"))

model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0001),
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])

# Affichage d'un résumé du modèle
model.summary()

In [None]:
# Apprentissage du modèle
training = model.fit(images,
                     labels,
                     epochs=15,
                     validation_split=0.30,
                     batch_size=128)

# Visualisation des métriques d'entrainement
plot_metrics(training.history)

## Évaluation des performances sur l'ensemble de test

Dans le dossier `seg_test` se trouve un ensemble de données qui n'ont jamais été vues durant l'apprentissage.

On utilisera la méthode `evaluate(X, y)` du modèle pour évaluer la qualité de nos prédictions sur ce dataset.

In [None]:
test_images,test_labels, test_file_paths = get_images(
    pathlib.Path("dataset-landscape") / "seg_test")
model.evaluate(test_images, test_labels, verbose=1)

## Analyse d'erreur

On affiche la matrice de confusion puis on regarde des images mal classées.

In [None]:
def analyze_preds(preds, labels):
  confusion_matrix = sklearn.metrics.confusion_matrix(preds,
                                                      labels,
                                                      normalize="true")
  seaborn.heatmap(confusion_matrix,
                  cmap="rocket_r",
                  xticklabels=label_names,
                  yticklabels=label_names)
  plt.title("Matrice de confusion")
  plt.show()

  seaborn.countplot(x=list(map(lambda x: label_names[x], preds)))
  plt.title("Décomptes des classes prédites")
  plt.ylabel("Décompte")
  plt.xlabel("Class")
  plt.show()


test_pred = numpy.argmax(model.predict(test_images), axis=-1)
analyze_preds(test_pred, test_labels)

In [None]:
def plot_mistakes(predicted_class: str, true_class: str) -> None:
  print(f"Prédiction : {predicted_class}, classe réelle : {true_class}")
  mistakes = test_images[(test_pred == label_to_index[predicted_class])
                         & (test_labels == label_to_index[true_class])]
  random_indexes = numpy.random.choice(mistakes.shape[0],
                                       size=min(mistakes.shape[0], 25),
                                       replace=False)
  grid_indexes = itertools.product(range(5), repeat=2)

  _, ax = plt.subplots(5, 5, figsize=(15, 15))
  for img_index, (i, j) in zip(random_indexes, grid_indexes):
    ax[i, j].imshow(mistakes[img_index])
    ax[i, j].axis("off")
  plt.show()

In [None]:
# Plot les images prédites glacier alors qu'elles ont un label montagne
plot_mistakes("glacier", "mountain")

In [None]:
# Plot les images prédites glacier alors qu'elles ont un label mer
plot_mistakes("glacier", "sea")

In [None]:
# Plot les images prédites bâtiment alors qu'elles ont un label mer
plot_mistakes("buildings", "sea")

## Transfert d'apprentissage

In [None]:
base_model = keras.applications.EfficientNetB7(include_top=False,
                                               weights="imagenet",
                                               input_shape=(150, 150, 3))

base_model.trainable = False

model = keras.Sequential(
    [base_model,
     keras.layers.SpatialDropout2D(0.1),
     keras.layers.Conv2D(1024, 1, activation="relu"),
     keras.layers.SpatialDropout2D(0.1),
     keras.layers.Conv2D(256, 1, activation="relu"),
     keras.layers.SpatialDropout2D(0.1),
     keras.layers.Conv2D(64, 1, activation="relu"),
     keras.layers.SpatialDropout2D(0.1),
     keras.layers.Conv2D(16, 1, activation="relu"),
     keras.layers.SpatialDropout2D(0.1),
     keras.layers.Flatten(),
     keras.layers.Dense(6, activation="softmax", kernel_regularizer="l2")])

model.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0001),
              loss="sparse_categorical_crossentropy",
              metrics=["accuracy"])

model.summary()

In [None]:
training = model.fit(images,
                     labels,
                     epochs=5,
                     validation_split=0.30,
                     batch_size=512)

plot_metrics(training.history)

In [None]:
model.evaluate(test_images, test_labels, verbose=1)
test_preds = numpy.argmax(model.predict(test_images), axis=-1)
analyze_preds(test_preds, test_labels)
plot_mistakes("glacier", "mountain")
plot_mistakes("glacier", "sea")
plot_mistakes("buildings", "sea")

## Prédire dans des condition « réelles »

Dans le dossier `seg_pred` se trouvent des images non-annotées. On ne peut donc pas évaluer correctement les performances sur cet ensemble. 

Cependant, on peut afficher des photos et les probabilités que notre modèle attribue à chaque classe.

In [None]:
pred_images, _, pred_file_paths = get_images(
    pathlib.Path("dataset-landscape") / "seg_pred")
pred_images.shape

In [None]:
# Création de la grille de sous-plots. On donne l'argument figsize pour agrandir
# la taille de la figure qui est petite par défaut
_, ax = plt.subplots(10, 5, figsize=(30, 45))

# On choisit 25 indices au hasard, sans replacement (on ne veut pas afficher la
# même image deux fois)
random_indexes = numpy.random.choice(pred_images.shape[0],
                                     size=(5, 5),
                                     replace=False)

for i in range(5):
  for j in range(5):
    img_index = random_indexes[i, j]
    # Récupération de l'image et prédiction de sa classe
    image = pred_images[img_index]
    probabilities = model.predict(image[None, ...])[0]
    predicted_class = label_names[numpy.argmax(probabilities)]

    # Affichage avec matplotlib et sa fonction imshow, très pratique en vision
    # par ordinateur
    ax[i * 2, j].imshow(image)
    ax[i * 2, j].set_title(f"Exemple {img_index}")
    ax[i * 2, j].axis('off')

    # Affichage de la distribution de prédiction sur la ligne d'en dessous
    ax[i * 2 + 1, j].bar(label_names, probabilities)