## Ouverture Pyspark et Librairies

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1727802056491_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Affichage des informations sur la session en cours et liens vers Spark UI :

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1727802056491_0001,pyspark,idle,Link,Link,,✔


## Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

## Import des librairies

In [3]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

# fonctions SQL PySpark
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
from pyspark.ml.feature import PCA
from pyspark.ml.functions import array_to_vector
from pyspark.ml.functions import vector_to_array

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [5]:
PATH = 's3://projet9-data-scientist'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://projet9-data-scientist
PATH_Data:   s3://projet9-data-scientist/Test
PATH_Result: s3://projet9-data-scientist/Results

## Traitement des données

### Chargement des données

**Chargement des données sous forme de données binaires, avec une recherche récursive dans les sous-dossiers, en ne sélectionnant que les fichiers au format .jpg.**

In [6]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://projet9-data...|2024-09-27 09:44:03|  7353|[FF D8 FF E0 00 1...|
|s3://projet9-data...|2024-09-27 09:44:03|  7350|[FF D8 FF E0 00 1...|
|s3://projet9-data...|2024-09-27 09:44:03|  7349|[FF D8 FF E0 00 1...|
|s3://projet9-data...|2024-09-27 09:44:03|  7348|[FF D8 FF E0 00 1...|
|s3://projet9-data...|2024-09-27 09:44:03|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [8]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+---------------------------------------------------------+----------+
|path                                                     |label     |
+---------------------------------------------------------+----------+
|s3://projet9-data-scientist/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://projet9-data-scientist/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://projet9-data-scientist/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://projet9-data-scientist/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://projet9-data-scientist/Test/Watermelon/r_95_100.jpg |Watermelon|
+---------------------------------------------------------+----------+
only showing top 5 rows

None

## Préparation du modèle  


Pour l'extraction des caractéristiques via la technique de transfert d'apprentissage à partir du modèle MobileNetV2 (optimisé pour une exécution rapide) : modèle TensorFlow, 53 couches, suppression de la dernière couche, traitement des images en 224x224. En sortie : un vecteur de caractéristiques de dimensions (1,1,1280), soit 1280 caractéristiques.

**Chargement du modèle MobileNetV2 avec des poids pré-entraînés sur ImageNet, tout en définissant le format des images d'entrée.**

In [9]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

**Création d'un nouveau modèle avec :**  
**en entrée : l'entrée du modèle MobileNetV2**  
**en sortie : la pénultième couche du modèle MobileNetV2**

In [10]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Chargement du nouveau modèle sur le driver avant de distribuer les poids aux différents workers.**

In [12]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

Activation du processus de création du modèle

In [14]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [15]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



**Pandas UDF (User-Defined Function) :**  
@pandas_udf('array', PandasUDFType.SCALAR_ITER) : Ce décorateur définit une fonction UDF qui traite les données en lots, représentées sous forme de séries Pandas. Cela permet d'appliquer la featurisation sur des lots d'images de manière distribuée en tirant parti des capacités de Spark.  
`content_series_iter` est un itérateur sur des lots d'images, permettant de charger une seule fois le modèle de deep learning (via `model_fn()`) et de l'appliquer sur plusieurs lots. Cela améliore les performances en chargeant le modèle une seule fois, minimisant ainsi la surcharge liée aux chargements répétés.

## Exécution des actions d'extraction de features

In [16]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features"),
                                            featurize_udf("content").alias("features_vectors")).withColumn("features_vectors", array_to_vector("features_vectors")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

**Application sur un DataFrame Spark :**  
Le DataFrame `images` est divisé en 24 partitions afin d'optimiser la parallélisation du traitement.  
Les colonnes `path` (chemin de l'image), `label` (étiquette) et les caractéristiques extraites sont sélectionnées et traitées.  
Les résultats sont stockés dans une colonne `features_vectors`, où les caractéristiques extraites sont converties en vecteurs via la fonction `array_to_vector` pour un traitement ultérieur.

In [18]:
features_df.show(5, True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+--------------------+
|                path|         label|            features|    features_vectors|
+--------------------+--------------+--------------------+--------------------+
|s3://projet9-data...|Pineapple Mini|[0.015393682, 4.6...|[0.01539368182420...|
|s3://projet9-data...|    Watermelon|[0.64753217, 0.34...|[0.64753216505050...|
|s3://projet9-data...|    Watermelon|[0.19582352, 0.01...|[0.19582352042198...|
|s3://projet9-data...|    Watermelon|[0.2414487, 0.224...|[0.24144870042800...|
|s3://projet9-data...|    Watermelon|[0.059718654, 0.2...|[0.05971865355968...|
+--------------------+--------------+--------------------+--------------------+
only showing top 5 rows

## Réduction de dimensions, application du PCA

In [19]:
# Number of components
PCA_K = 200
#PCA_K = 100

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Application de l'algorithme PCA (Principal Component Analysis) sur les vecteurs de caractéristiques

pca = PCA(k=PCA_K, inputCol = 'features_vectors', outputCol = 'pca_vectors')
model = pca.fit(features_df)
df_pca = model.transform(features_df)
df_pca.show(5, True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+--------------------+--------------------+--------------------+
|                path|     label|            features|    features_vectors|         pca_vectors|
+--------------------+----------+--------------------+--------------------+--------------------+
|s3://projet9-data...|Watermelon|[0.5698791, 0.115...|[0.56987911462783...|[-3.6640231821318...|
|s3://projet9-data...|Watermelon|[0.64753217, 0.34...|[0.64753216505050...|[-2.1975559794983...|
|s3://projet9-data...|Watermelon|[0.00888293, 0.19...|[0.00888292957097...|[-2.7703872359664...|
|s3://projet9-data...|Watermelon|[0.0, 0.13640745,...|[0.0,0.1364074498...|[-3.1825366756191...|
|s3://projet9-data...|Watermelon|[0.059718654, 0.2...|[0.05971865355968...|[-2.6389855205280...|
+--------------------+----------+--------------------+--------------------+--------------------+
only showing top 5 rows

In [21]:
# Obtenir la variance expliquée par chaque composante principale
explained_variance = model.explainedVariance

# Calculer la variance expliquée cumulée
variance_cumulee = explained_variance.sum()

# Afficher le résultat
print(f"Proportion de variance expliquée par les 200 premières composantes : {variance_cumulee * 100:.2f}%")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Proportion de variance expliqu?e par les 200 premi?res composantes : 90.74%

**À l'issue du processus de prétraitement, la dimension du vecteur de caractéristiques des images est réduite à 200, tout en conservant 90 % de l'information initiale.**

In [22]:
# ajout d'une colonne transformée vecteur ==> array
# Ajout d'une colonne convertissant le vecteur en tableau

df_pca = df_pca.withColumn('pca_features', vector_to_array('pca_vectors'))
df_pca.show(5, True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|                path|         label|            features|    features_vectors|         pca_vectors|        pca_features|
+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|s3://projet9-data...|    Watermelon|[0.04990552, 0.18...|[0.04990551993250...|[-1.6456158058129...|[-1.6456158058129...|
|s3://projet9-data...|    Watermelon|[0.5698791, 0.115...|[0.56987911462783...|[-3.6640231821318...|[-3.6640231821318...|
|s3://projet9-data...|Pineapple Mini|[0.0377613, 4.532...|[0.03776130080223...|[-6.1077356779684...|[-6.1077356779684...|
|s3://projet9-data...|    Watermelon|[0.00888293, 0.19...|[0.00888292957097...|[-2.7703872359664...|[-2.7703872359664...|
|s3://projet9-data...|    Watermelon|[0.0, 0.13640745,...|[0.0,0.1364074498...|[-3.1825366756191...|[-3.1825366756191...|
+--------------------+--

In [23]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://projet9-data-scientist/Results

## Stockage des résultats

In [24]:
df_pca.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# Obtenir le nombre de lignes dans le DataFrame df_pca
row_count = df_pca.count()

print(f"Le nombre de lignes est : {row_count}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Le nombre de lignes est : 22688

###  Suivi de l'avancement des tâches avec le Serveur d'Historique Spark

- Il est possible de suivre l'avancement des tâches en cours avec le serveur d'historique Spark.
- Le serveur permet également de revenir et d'étudier les tâches déjà réalisées afin de déboguer et d'optimiser les futures tâches.
- Lorsque la commande `features_df.write.mode("overwrite").parquet(PATH_Result)` était en cours, nous pouvions observer son état d'avancement.
- Le serveur d'historique Spark offre une vision plus précise de l'exécution des différentes tâches sur les différentes machines du cluster.

---

###  Résiliation de l'instance EMR

- Notre travail est maintenant terminé.
- Le cluster EMR est facturé à la demande, et la facturation continue même lorsque les machines sont inactives.
- Pour optimiser la facturation, nous devons résilier le cluster.
- Commande de résiliation via l'interface AWS :
  - Désactivez le tunnel SSH dans FoxyProxy pour éviter les problèmes de timeout.
  - Cliquez sur "Résilier".
  - La résiliation prend environ 1 minute.

---

###  Cloner le serveur EMR (si besoin)

- Si nous devons à nouveau exécuter notre notebook dans les mêmes conditions, nous pouvons cloner le cluster pour obtenir une copie fonctionnelle en 15 à 20 minutes.
- Deux méthodes pour cloner le cluster :

  - **Depuis l'interface AWS** :
    - Cliquez sur "Cloner".
    - La configuration du cluster est recréée à l'identique.
    - Il est possible de revenir sur les différentes étapes pour apporter des modifications.
    - Lorsque tout est prêt, cliquez sur "Créer un cluster".

  - **Via la ligne de commande (avec AWS CLI installé et configuré)** :
    - Assurez-vous de disposer des droits nécessaires sur le compte AMI utilisé.
    - Cliquez sur "Exporter AWS CLI".
    - Copiez/collez la commande dans un terminal.

---

###  Arborescence du serveur S3 à la fin du projet

- Mon bucket S3 `projet9-data-scientist` à la fin du projet.
- Par souci de lisibilité, les sous-dossiers du répertoire "Test" ne sont pas listés.