# **Réalisez un traitement dans un environnement Big Data sur le Cloud**

Puisque les tests en local se sont bien déroulés, nous allons maintenant exécuter notre script dans un environnement cloud sur un plus grand nombre d'images.

Nous allons passer par la plateforme [AWS](https://aws.amazon.com/fr/) et utiliser deux services : 
- [EMR](https://aws.amazon.com/fr/emr/) : notre cluster de machine qui exécutera notre notebook avec Spark, grâce à JupyterHub qui sera installé sur le driver.
- [S3](https://aws.amazon.com/fr/s3/) : notre service de stockage qui contiendra nos images à traiter, le notebook à exécuter, les logs d'exécutions de Spark et nos fichiers de sortie.

## **Démarrage de la session Spark**

Le code sera exécuté depuis **JupyterHub hébergé sur un cluster EMR AWS**.

<u>Avant de commencer</u>, **il faut s'assurer d'utiliser le kernel PySpark**.

**En utilisant ce kernel, une session spark est créé à l'exécution de la première cellule**. Les variable *spark* et *sc* seront déjà disponibles.

Il n'est donc **plus nécessaire d'exécuter le code "spark = (SparkSession ..."** comme dans notre notebook exécuté en local.

In [None]:
# L'exécution de cette cellule démarre l'application Spark

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [None]:
%%info

## **Import des librairies**

In [None]:
# Pour lire les images qui auront été chargées au format binaire : 
import io
# Pour la gestion des chemins de fichiers
import os

# Pour valider les résultats et les exporter en CSV
import pandas as pd
# Pour redimensionner les images
from PIL import Image
# Pour la manipulation d'arrays
import numpy as np

# Pour l'extraction de features des images
import tensorflow as tf
# Sur EMR AWS, avec le kernel pyspark, keras a été installé à part (lors du bootstrap)
# "from tensorflow.keras..." , ne fonctionne pas; il fait faire "from keras..."
from keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from keras.preprocessing.image import img_to_array
from keras import Model

# Création et manipulation de dataframes Spark
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

# Pour convertion des features en vecteurs Spark
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
# Pour réalisation de la PCA sur les features
from pyspark.ml.feature import PCA

## **1. Définition des PATH pour charger les images et enregistrer les résultats**

In [None]:
PATH = 's3://data-p8'
PATH_Data = PATH + '/Test_med'
PATH_Result = PATH + '/Results'
print('PATH :        ' + PATH\
      + '\nPATH_Data :   ' + PATH_Data \
      + '\nPATH_Result : ' + PATH_Result)

## **2. Traitement des données**

### **2.1 Chargement des données**

Les images sont chargées au format binaire, ce qui offre plus de souplesse dans la façon de prétraiter les images.

Avant de charger les images, nous spécifions que nous voulons charger uniquement les fichiers dont l'extension est **jpg**.

Nous indiquons également de charger tous les objets possibles contenus dans les sous-dossiers du dossier communiqué ("recursiveFileLookup").

In [None]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

Vérifions que le chargement s'est bien passé : 

In [None]:
images.show(5)

Ajout d'une colonne contenant les **labels** de chaque image :

In [None]:
# split(images['path'], '/'),-2) : couper le chemin complet à chaque '/'
# et prendre l'avant dernier élément, c'est-à-dire le nom du dossier
# qui contient les images
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())  # Affichage de la structure du dataframe
print(images.select('path','label').show(5,False)) # True si on veut tronquer les colonnes lors de l'affichage

### **2.2 Préparation du modèle**

Nous allons utiliser la technique du **transfert learning** pour extraire les features des images.

Nous allons utiliser le modèle **MobileNetV2** pour sa rapidité d'exécution comparée à d'autres modèles comme *VGG16* par exemple. Nous allons donc récupérer l'avant dernière couche du modèle en sortie. La dernière couche, avec sa fonction d'activation *Softmax*, est destinée à la classification, ce que nous ne souhaitons pas ici.

**MobileNetV2**, lorsqu'on l'utilise en incluant toutes ses couches, attend obligatoirement des images de dimension (224,224,3). Nos images étant toutes de dimension (100,100,3), nous devrons simplement les **redimensionner** avant de les confier au modèle.

<u>Dans l'odre</u> :
 1. Nous chargeons le modèle **MobileNetV2** avec les poids **précalculés** issus d'**imagenet** et en spécifiant le format de nos images en entrée
 2. Nous créons un nouveau modèle avec :
  - <u>en entrée</u> : l'entrée du modèle MobileNetV2
  - <u>en sortie</u> : l'avant dernière couche du modèle MobileNetV2

In [None]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

In [None]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

Affichage du résumé de notre nouveau modèle où nous constatons que <u>nous récupérons en sortie un vecteur de dimension (1, 1, 1280)</u> :

In [None]:
new_model.summary()

Tous les workeurs doivent pouvoir accéder au modèle ainsi qu'à ses poids. <br />
Une bonne pratique consiste à charger le modèle sur le driver puis à diffuser ensuite les poids aux différents workeurs.

In [None]:
brodcast_weights = sc.broadcast(new_model.get_weights())

<u>Mettons cela sous forme de fonction</u> :

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)  # Diffusion des poids
    return new_model

### **2.3 Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF**

Ce notebook définit la logique par étapes, jusqu'à Pandas UDF.

<u>L'empilement des appels est la suivante</u> :

- Pandas UDF
  - featuriser une série d'images pd.Series
   - prétraiter une image

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    # Chargement des images au format binaire et redimensionnement
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)  # Conversion en array
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # Pour certaines couches, les caractéristiques de sortie seront des tenseurs multidimensionnels.
    # Nous aplatissions les tenseurs de caractéristiques en vecteurs pour faciliter le stockage dans les DataFrames Spark.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # Avec PandasUDFType.SCALAR_ITER, nous pouvons charger le modèle une fois puis le réutiliser
    # pour plusieurs lots de données. Cela amortit les coûts liés au chargement des modèles volumineux.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

### **2.4 Exécution des actions d'extraction de features**

Les Pandas UDF, sur de grands enregistrements (par exemple, de très grandes images), peuvent rencontrer des erreurs de type Out Of Memory (OOM).

Si de telles erreurs arrivent ci-dessous, il est possible de réduire la taille du lot Arrow via 'maxRecordsPerBatch'

Il ne devrait pas y avoir de problème ici, je laisse donc la commande en commentaire.

In [None]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

Il s'agit ici d'une étape de transformation dans Spark, autrement dit l'extraction réelle des features n'aura pas encore lieu, elle sera déclenchée par une action plus tard.

In [None]:
# Choix du nombre de partitions que l'on va créer avec "images"
nb_partitions = 24

features_df = images.repartition(nb_partitions).select(col("path"),
                                                       col("label"),
                                                       featurize_udf("content").alias("features")
                                                      )

Affichage de la structure du dataframe *features_df* : 

In [None]:
features_df.printSchema()

### **2.5 Réalisation de la PCA**

Conversion de la colonne features en vecteurs, car c'est le format d'entrée requis pour faire une PCA avec Spark : 

In [None]:
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
features_df = features_df.withColumn('features', list_to_vector_udf('features'))

On choisit le nombre de composantes à garder avec la PCA : 

In [None]:
n_componants = 20

Réalisation de la PCA, ce qui va constituer une action qui va déclencher les calculs de featurisation.

En effet, pour réaliser la PCA, l'ensemble des features doit être disponible pour pouvoir calculer la matrice de covariance, les vecteurs propres et les valeurs propres associées.

Par conséquent, on ne peut pas effectuer une PCA au fur et à mesure que les valeurs sont créées.

In [None]:
pca = PCA(k=n_componants, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(features_df)
features_df = model.transform(features_df).select("path", "label", "features", "pcaFeatures")

Affichage de la structure du dataframe *features_df* après la PCA : 

In [None]:
features_df.printSchema()

## **3. Écriture des résultats sous forme de fichiers**

In [None]:
print(f"Les résultats iront ici :\n{PATH_Result}")

Enregistrement des données traitées au format "**parquet**" :

In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)

## **4. Chargement des données enregistrées, validation des résultats et export des composantes PCA dans un fichier CSV**

### **4.1 Chargement des données**

On charge les données qui ont été enregistrées au format parquet dans un **DataFrame Pandas** :

In [None]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

On ne va garder que les noms de fichiers au lieu de tout le chemin : 

In [None]:
# Séparer les éléments de 'path' et ne garder que le dernier (nom du fichier)
df['path'] = df['path'].apply(lambda x: x.split('/')[-1])
# Renommer la colonne 'path' en 'filename'
df = df.rename(columns={'path': 'filename'})

Ici les valeurs prennent la forme d'un dictionnaire, car *features* avait été converti en vecteurs spark

In [None]:
df['features'][0]

Nous souhaitons seulement avoir les valeurs : 

In [None]:
df['features'] = df['features'].apply(lambda x: x['values'] if x is not None else None)
df['pcaFeatures'] = df['pcaFeatures'].apply(lambda x: x['values'] if x is not None else None)

### **4.2 Validation des résultats**

On valide que la dimension du vecteur de caractéristiques ('features') des images est bien de dimension 1280, c'est-à-dire la dimension de l'avant dernière couche du modèle *MobileNetV2* :

In [None]:
df.loc[0,'features'].shape

On valide dimension du vecteur de caractéristiques après PCA ('pcaFeatures'), qui doit correspondre au nombre choisi de composantes lors de la PCA : 

In [None]:
df.loc[0,'pcaFeatures'].shape

Contenu de notre dataframe Pandas : 

In [None]:
df.head()

### **4.3 Export des composantes PCA dans un ficheir CSV**

Enregistrement des pcaFeatures (associées aux noms de fichier et labels) au format CSV : 

In [None]:
# np.printoptions pour éviter l'insertion de "\n" dans 'pcaFeatures' dans notre fichier csv
with np.printoptions(linewidth=10000):
    df[['filename', 'label', 'pcaFeatures']].to_csv(PATH_Result+'/'+'pcaFeatures.csv', index=False, sep='\t')