# DEPLOIEMENT DU MODELE DANS LE CLOUD

### Contexte
Vous êtes Data Scientist dans une très jeune start-up de l'AgriTech, nommée  "Fruits!", qui cherche à proposer des solutions innovantes pour la récolte des fruits.

La volonté de l’entreprise est de préserver la biodiversité des fruits en permettant des traitements spécifiques pour chaque espèce de fruits en développant des robots cueilleurs intelligents.

Votre start-up souhaite dans un premier temps se faire connaître en mettant à disposition du grand public une application mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

Pour la start-up, cette application permettrait de sensibiliser le grand public à la biodiversité des fruits et de mettre en place une première version du moteur de classification des images de fruits.

De plus, le développement de l’application mobile permettra de construire une première version de l'architecture Big Data nécessaire.

### Mission

Vous êtes donc chargé de vous approprier les travaux réalisés par l’alternant et de compléter la chaîne de traitement.

Il n’est pas nécessaire d’entraîner un modèle pour le moment.

L’important est de mettre en place les premières briques de traitement qui serviront lorsqu’il faudra passer à l’échelle en termes de volume de données !

### Contraintes

Lors de son brief initial, Paul vous a averti des points suivants :

- Vous devrez tenir compte dans vos développements du fait que le volume de données va augmenter très rapidement après la livraison de ce projet. Vous continuerez donc à développer des scripts en Pyspark et à utiliser le cloud AWS pour profiter d’une architecture Big Data (EMR, S3, IAM). Si vous préférez, vous pourrez transférer les traitements dans un environnement Databricks.

- Vous devez faire une démonstration de la mise en place d’une instance EMR opérationnelle, ainsi qu’ expliquer pas à pas le script PySpark, que vous aurez complété :
  - d’un traitement de diffusion des poids du modèle Tensorflow sur les clusters (broadcast des “weights” du modèle).
  - d’une étape de réduction de dimension de type PCA en PySpark.

- Vous respecterez les contraintes du RGPD : dans notre contexte, vous veillerez à paramétrer votre installation afin d’utiliser des serveurs situés sur le territoire européen.

- Votre retour critique de cette solution sera également précieuse, avant de décider de la généraliser.

- La mise en œuvre d’une architecture Big Data de type EMR engendrera des coûts. Vous veillerez donc à ne maintenir l’instance EMR opérationnelle que pour les tests et les démos.

### A NOTER

<u>Afin de limiter les coûts, il nous a été stipulé que nous pouvions restreindre le jeu de données. Ainsi, 10 fruits ont été sélectionnés avec 10 images par fruit, soit un total de 100 images.<u>

# 1. Démarrage de la session Spark et import des librairies

## 1.1. Démarrage de la session Spark

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
4,application_1702223184396_0005,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des informations sur la session en cours et liens vers Spark UI<u>

Spark UI surveille et diagnostique les applications Spark en cours d'exécution

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
4,application_1702223184396_0005,pyspark,idle,Link,Link,,✔


## 1.2. Import des librairies

In [3]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import PCA

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 2. Définition des PATH pour le chargement des images et l'enregistrement des résultats

<u>Nous accédons directement à nos données sur S3 comme si elles étaient stockées localement.<u>

In [4]:
PATH = 's3://p8-data-cloud'
PATH_Data = PATH+'/Fruits_Cloud'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://p8-data-cloud
PATH_Data:   s3://p8-data-cloud/Fruits_Cloud
PATH_Result: s3://p8-data-cloud/Results

# 3. Traitement des données

## 3.1. Chargement des données

In [5]:
# Chargement des images avec l'extension .jpg sous format binaire présentes dans les répertoires et sous-répertoires
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# Affichage de 5 images
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p8-data-clou...|2023-12-07 14:48:50|  6688|[FF D8 FF E0 00 1...|
|s3://p8-data-clou...|2023-12-07 14:48:50|  6663|[FF D8 FF E0 00 1...|
|s3://p8-data-clou...|2023-12-07 14:48:50|  6610|[FF D8 FF E0 00 1...|
|s3://p8-data-clou...|2023-12-07 14:48:49|  6606|[FF D8 FF E0 00 1...|
|s3://p8-data-clou...|2023-12-07 14:48:51|  6592|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Conservation unique du **path** et ajout d'une colonne contenant les **labels** de chaque image.<u>

In [7]:
# Ajout d'une nouvelle colonne 'label' au dataframe images
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))

# Impression des résultats
images.select('path','label').show(5,False)

# Impression du schéma du dataframe
print(images.printSchema())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------------------------------------------------------+-----------+
|path                                                     |label      |
+---------------------------------------------------------+-----------+
|s3://p8-data-cloud/Fruits_Cloud/Cauliflower/r_228_100.jpg|Cauliflower|
|s3://p8-data-cloud/Fruits_Cloud/Cauliflower/r_251_100.jpg|Cauliflower|
|s3://p8-data-cloud/Fruits_Cloud/Cauliflower/r_304_100.jpg|Cauliflower|
|s3://p8-data-cloud/Fruits_Cloud/Cauliflower/r_158_100.jpg|Cauliflower|
|s3://p8-data-cloud/Fruits_Cloud/Cauliflower/r_4_100.jpg  |Cauliflower|
+---------------------------------------------------------+-----------+
only showing top 5 rows

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None

<u>Vérification du nombre d'images (attendu 100)<u>

In [8]:
# Obtention du nombre de lignes (donc images) avec .count() dans Pyspark (en non .shape[0])
images.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

100

## 3.2. Préparation du modèle

Dans un réseau de neurones, chaque connexion entre deux neurones a un poids associé. Ces poids sont ajustés pendant la phase d'apprentissage du modèle afin d'optimiser les performances du réseau.

Dans ce contexte de calcul distribué, la diffusion des poids (brodcasting) dans permet un partage efficace de ces derniers entre différents nœuds de calcul, ce qui est souvent nécessaire pour accélérer l'entraînement de modèles profonds sur de grands ensembles de données.

In [9]:
# Ouverture du modèle MobileNetV2 avec l'ensemble des couches
model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3))

# Suppression de la dernière couche du cmodèle (ici -2)
new_model = Model(inputs=model.input, outputs=model.layers[-2].output)

# Diffusion des poids du modèle
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Résumé du modèle
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

<u>Fonction pour le modèle<u>

In [11]:
def model_fn():
    """
    Retourne un modèle MobileNetV2 sans la dernière couche
    et les poids diffusés pré-entraînés.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 3.3. Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

La dernière fonction @pandas_udf applique une transformation itérative sur une colonne du DataFrame en utilisant un modèle chargé une fois pour plusieurs lots de données, ce qui améliore les performances en évitant de recharger le modèle à chaque itération. La transformation elle-même est une extraction de caractéristiques d'une série de données d'image à l'aide du modèle chargé.

In [12]:
# Redimensionnement de nos images en 224x224 pixels (taille originale 100x100 pixels)
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

# Obtention des caractériques des images sous forme de pd.series après prédiction par le modèle
def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



# 4. Extraction des features des images

<u>Comme précédemment stipulé, l'extraction des caractéristiques des images sera réalisée sur 100 images: 10 fruits avec 10 images par fruit. Ces images sont issues de jeu de données test.<u>

## 4.1. Extraction des features des 100 images

In [13]:
# Extraction des features en utilisant 20 exécuteurs
features_df = images.repartition(20).select(
    col("path"),
    col("label"),
    featurize_udf("content").alias("features")
)

# Visualisation de 5 lignes au hasard du DataFrame obtenu
features_df.show(5, truncate=True)

# Vérification du nombre d'images (nombre de lignes: attendu 100)
features_df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+
|                path|         label|            features|
+--------------------+--------------+--------------------+
|s3://p8-data-clou...|          Lime|[0.10586139, 0.00...|
|s3://p8-data-clou...|          Pear|[0.24130498, 0.0,...|
|s3://p8-data-clou...|          Lime|[0.0, 0.0, 0.0569...|
|s3://p8-data-clou...|Apple Golden 2|[0.063913345, 0.0...|
|s3://p8-data-clou...|   Onion white|[0.12695207, 0.0,...|
+--------------------+--------------+--------------------+
only showing top 5 rows

100

## 4.2. Réalisation de la PCA avec 137 composantes

Lors du test en local, nous avons pu déterminer que:
1.   Les caractéristiques des images doivent être converties en vecteur avant la réalisation de la PCA.
2.   L'atteinte de 99% de la variance expliquée nécessitait 137 composantes.


<u>Réalisation de la PCA après vectorisation des "features"<u>

In [14]:
# Définition d'une fonction UDF pour convertir la colonne "features" en vecteur
array_to_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())

# Application de la fonction UDF pour la création d'une nouvelle colonne "features_vector"
features_df = features_df.withColumn("features_vector", array_to_vector_udf(features_df["features"]))

# Création d'un objet PCA avec les 137 composantes principales pour atteindre 99% de la variance expliquée
pca = PCA(k=137, inputCol="features_vector", outputCol="vectorized_pca_features")

# Application de la PCA sur le DataFrame
model = pca.fit(features_df)
pca_features_df = model.transform(features_df)

# Sélection des colonnes pertinentes et affichage de 5 lignes au hasard
features_df_pca = pca_features_df.select("path", "label", "vectorized_pca_features")
features_df_pca.show(5, truncate=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+-----------------------+
|                path|         label|vectorized_pca_features|
+--------------------+--------------+-----------------------+
|s3://p8-data-clou...|          Lime|   [-8.4689094507281...|
|s3://p8-data-clou...|Apple Golden 2|   [-7.4866635200894...|
|s3://p8-data-clou...|          Lime|   [-6.7544818278395...|
|s3://p8-data-clou...|          Pear|   [-7.7287299544243...|
|s3://p8-data-clou...|   Onion white|   [11.3108575033495...|
+--------------------+--------------+-----------------------+
only showing top 5 rows

<u>Restructuration/dévectorisation des features après PCA.<u>

In [15]:
from pyspark.sql.types import ArrayType, FloatType

# Définition explicite de la fonction UDF
def vector_to_array(vec):
    return vec.toArray().tolist()

# Conversion de la fonction Python en UDF
vector_to_array_udf = udf(vector_to_array, ArrayType(FloatType()))

# Application de la fonction UDF pour la création d'une nouvelle colonne "pca_features"
final_df = features_df_pca.withColumn("pca_features", vector_to_array_udf("vectorized_pca_features"))

# Sélection des colonnes pertinentes et affichage des 5 premières lignes
final_df = final_df.select("path", "label", "pca_features")
final_df.show(5, truncate=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+
|                path|         label|        pca_features|
+--------------------+--------------+--------------------+
|s3://p8-data-clou...|          Lime|[-6.754482, 1.691...|
|s3://p8-data-clou...|          Pear|[-7.7287297, 3.23...|
|s3://p8-data-clou...|          Lime|[-8.468909, 1.793...|
|s3://p8-data-clou...|Apple Golden 2|[-7.4866633, -1.4...|
|s3://p8-data-clou...|        Walnut|[6.6853433, 3.577...|
+--------------------+--------------+--------------------+
only showing top 5 rows

In [16]:
# Affichage du schéma du DataFrame pour identifier le type de la colonne pca_features (normalement array)
final_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- pca_features: array (nullable = true)
 |    |-- element: float (containsNull = true)

In [17]:
# Localisation des résultats
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://p8-data-cloud/Results

<u>Enregistrement des données traitées au format "**parquet**"</u> :

In [18]:
# Enregistrement des données au format 'parquet'
final_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 5. Chargement des données enregistrées et validation du résultat

<u>Chargement des données fraichement enregistrées dans un **DataFrame Pandas**</u>

In [19]:
# Chargement des données depuis mon cloud :
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des 5 premières lignes du DataFrame</u>

In [20]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                       pca_features
0   s3://p8-data-cloud/Fruits_Cloud/Lime/262_100.jpg  ...  [-6.754482, 1.6913179, -6.994117, -0.10847554,...
1   s3://p8-data-cloud/Fruits_Cloud/Pear/r_7_100.jpg  ...  [-7.7287297, 3.235324, -11.676723, -0.0947821,...
2  s3://p8-data-cloud/Fruits_Cloud/Lime/r_52_100.jpg  ...  [-8.468909, 1.7938887, -8.9465885, 0.9497765, ...
3  s3://p8-data-cloud/Fruits_Cloud/Apple Golden 2...  ...  [-7.4866633, -1.4561422, -7.9126353, 5.1878214...
4  s3://p8-data-cloud/Fruits_Cloud/Walnut/12_100.jpg  ...  [6.6853433, 3.577582, -12.337966, -6.595814, -...

[5 rows x 3 columns]

<u>Validation de la dimension du vecteur de caractéristiques des images (attendu: 137)</u>

In [21]:
df.loc[0,'pca_features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(137,)

<u>Création d'une colonne pour chaque composante tout en conservant la colonne d'origine</u>

In [22]:
# Création d'une liste de colonnes à ajouter
new_columns = []

# Ajout des 137 composantes du vecteur en colonnes individuelles
num_components = 137

for i in range(num_components):
    new_columns.append(pd.Series(df['pca_features'].apply(lambda x: x[i] if isinstance(x, np.ndarray) and i < len(x) else np.nan), name=f'pca_feature_{i+1}'))

# Utilisation pd.concat pour ajouter toutes les colonnes à la fois
complete_df = pd.concat([df] + new_columns, axis=1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Dimension du dataframe</u>

In [23]:
complete_df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(100, 140)

<u>Elimination de la colonne 'pca_features'<u>

In [24]:
cloud_df = complete_df.drop('pca_features', axis=1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# Affichage des 5 premières lignes
cloud_df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ... pca_feature_137
0   s3://p8-data-cloud/Fruits_Cloud/Lime/262_100.jpg  ...        -0.43325
1   s3://p8-data-cloud/Fruits_Cloud/Pear/r_7_100.jpg  ...        -0.43325
2  s3://p8-data-cloud/Fruits_Cloud/Lime/r_52_100.jpg  ...        -0.43325
3  s3://p8-data-cloud/Fruits_Cloud/Apple Golden 2...  ...        -0.43325
4  s3://p8-data-cloud/Fruits_Cloud/Walnut/12_100.jpg  ...        -0.43325

[5 rows x 139 columns]

<u>Comptage du nombre d'images de fruits/légumes par classe</u>

In [26]:
cloud_df['label'].value_counts()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Lime              10
Pear              10
Apple Golden 2    10
Walnut            10
Onion white       10
Tomato 1          10
Kiwi              10
Clementine        10
Cauliflower       10
Corn              10
Name: label, dtype: int64

<u>Sauvegarde des données sous format csv<u>

In [27]:
# Chemin S3 pour l'enregistrement du fichier CSV
path_s3 = 's3://p8-data-cloud/Results/P8.csv'

# Enregistrement du DataFrame en tant que fichier CSV sur S3
cloud_df.to_csv(path_s3, index=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
# Chemin S3 du fichier CSV
path_s3 = 's3://p8-data-cloud/Results/P8.csv'

# Lecture le fichier CSV depuis S3
cloud_df = pd.read_csv(path_s3)

# Affichage des 5 premières lignes
cloud_df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ... pca_feature_137
0   s3://p8-data-cloud/Fruits_Cloud/Lime/262_100.jpg  ...        -0.43325
1   s3://p8-data-cloud/Fruits_Cloud/Pear/r_7_100.jpg  ...        -0.43325
2  s3://p8-data-cloud/Fruits_Cloud/Lime/r_52_100.jpg  ...        -0.43325
3  s3://p8-data-cloud/Fruits_Cloud/Apple Golden 2...  ...        -0.43325
4  s3://p8-data-cloud/Fruits_Cloud/Walnut/12_100.jpg  ...        -0.43325

[5 rows x 139 columns]