
# <font size="+3"><span style='color:#861141'> **P8 - Déployez un modèle dans le cloud** </span></font>


<a id='strat_spark_session'></a>

---
---

# <span style='background:#861141'><span style='color:white'>**Starting Spark Session** </span></span>

In [None]:
# L'exécution de cette cellule démarre l'application Spark

<u>We create the **SparkConext** under the varaible "**sc**"</u> :

Let's display information about the spark session :

In [3]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1674810531637_0001,pyspark,idle,Link,Link,,✔


<a id='LOADING_LIBRARIES'></a>

---

---

# <span style='background:#861141'><span style='color:white'>**Loading de libraries** </span></span>

In [1]:
# File system management
import os
import sys
from pathlib import Path
import io
import glob

# Data manipulation
import numpy as np
import pandas as pd
import random
import shutil

# Image manipulation
from PIL import Image

# Tensorflow
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

# Pyspark
from pyspark.ml.feature import PCA, StandardScaler
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors, VectorUDT

2023-08-25 09:38:03.845132: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


<a id='preambule'></a>

---
---

# <span style='background:#861141'><span style='color:white'>**Préambule** </span></span>


La très jeune start-up de l'AgriTech, nommée **"Fruits!", cherche à proposer des solutions innovantes pour la récolte des fruits**.

La volonté de l’entreprise est de préserver la biodiversité des fruits en permettant des traitements spécifiques pour chaque espèce de fruits en développant des robots cueilleurs intelligents.

La start-up souhaite dans un premier temps se faire connaître en mettant à disposition du grand public une application mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.
Pour la start-up, cette application permettrait de sensibiliser le grand public à la biodiversité des fruits et de mettre en place une première version du moteur de classification des images de fruits.
De plus, le développement de l’application mobile permettra de construire une première version de l'architecture Big Data nécessaire.

**Objectifs dans ce projet**

* **Développer une première chaîne de traitement des données qui comprendra le preprocessing et une étape de réduction de dimension**. Tenir compte du fait que le volume de données va augmenter très rapidement après la livraison de ce projet, ce qui implique de:
  * Déployer le traitement des données dans un **environnement Big Data**
  * Développer les scripts en **pyspark** pour effectuer du **calcul distribué**

Le projet va être réalisé **en 2 phases**, dans deux environnements différents.
* Nous allons dans une première phase développer et exécuter notre code en **local**, en travaillant sur un nombre limité d'images à traiter.
* Une fois les choix techniques validés, nous **déploierons** notre solution dans un **environnement Big Data en mode distribué**.
Ce notebook correspond à la déuxieme phase.

Un alternant a formalisé un document dans lequel il teste une première approche dans un environnement Big Data. Le notebook réalisé par l’alternant servira de point de départ pour construire une partie de la chaîne de traitement des données.


**Mission**
* Reprendre les travaux réalisés par l’alternant et de compléter la chaîne de traitement avec une étape de **réduction de dimension**.
* Il n’est pas nécessaire d’entraîner un modèle pour le moment.
* L’important est de mettre en place les premières briques de traitement qui serviront lorsqu’il faudra passer à l’échelle en termes de volume de données !

<a id='exploration_donnees'></a>

---
---

# <span style='background:#861141'><span style='color:white'>**Exploration des données** </span></span>


Nous accédons directement à nos données sur S3 comme si elles étaient stockées localement.

In [4]:
# Define the folder containing the files with the project data
s3_P8 = "s3://rsp-oc-p8-fruits/"
ATH_DataTest = s3_P8+'Test/'
PATH_Result = s3_P8 + "Results/"


print('PATH Project:     '+\
      s3_P8+'\n\nPATH_DataTest:    '+\
      PATH_DataTest+'\n\nPATH_Result:    '+\
      PATH_Result)

PATH Project:     /home/raquelsp/Documents/Openclassrooms/P8_Fruits_modele_cloud/

PATH_DataTrain:   /home/raquelsp/Documents/Openclassrooms/P8_Fruits_modele_cloud/P8_source/fruits/fruits-360_dataset/fruits-360/Training/

PATH_DataTest:    /home/raquelsp/Documents/Openclassrooms/P8_Fruits_modele_cloud/P8_source/fruits/fruits-360_dataset/fruits-360/Test/


<a id='traitement_donnees'></a>

---
---

# <span style='background:#861141'><span style='color:white'>**Traitement des données** </span></span>


<a id='chargement_donnees'></a>

## <span style='background:#7a1d2c'><span style='color:white'>Chargement des données</span></span>

* Les images sont chargées au format binaire, ce qui offre, plus de souplesse dans la façon de prétraiter les images.
* Seuelement les fichiers dont l'extension est **jpg** seront chargés.
* Les fichiers contenus contenus dans les sous-dossiers du dossier communiqué seront également chargés.

In [16]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_DataTest)

<u>Affichage des 5 premières images contenant</u> :
 - le path de l'image
 - la date et heure de sa dernière modification
 - sa longueur
 - son contenu encodé en valeur hexadécimal

In [None]:
images.show(5)

<u>Seulement le **path** de l'image est conservé, une colonne contenant les **labels** de chaque image est ajouté</u> :

In [17]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+----------------------------------------------------------------------------------------------------------+---------+
|path                                                                                                      |label    |
+----------------------------------------------------------------------------------------------------------+---------+
|file:/home/raquelsp/Documents/Openclassrooms/P8_Fruits_modele_cloud/P8_travail/Test1/Raspberry/199_100.jpg|Raspberry|
|file:/home/raquelsp/Documents/Openclassrooms/P8_Fruits_modele_cloud/P8_travail/Test1/Raspberry/200_100.jpg|Raspberry|
|file:/home/raquelsp/Documents/Openclassrooms/P8_Fruits_modele_cloud/P8_travail/Test1/Raspberry/81_100.jpg |Raspberry|
|file:/home/raquelsp/Documents/Openclassrooms/P8_Fruits_modele_cloud/P8_trav

<a id='preparation_modele'></a>

## <span style='background:#7a1d2c'><span style='color:white'>Préparation du modèle</span></span>

On a décidé de travailler avec du transfert learning. Le transfert learning consiste à utiliser la connaissance déjà acquise par un modèle entraîné (ici **MobileNetV2**) pour l'adapter à notre problématique.
Nous allons fournir au modèle nos images, et nous allons récupérer l'avant dernière couche du modèle. Cela permettra de réaliser une première version du moteur pour la classification des images des fruits.

In [18]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

2023-08-25 09:38:20.322354: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-08-25 09:38:20.323222: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.


In [19]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

In [21]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [None]:
new_model.summary()

<u>Mettons cela sous forme de fonction</u> :

In [22]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

<a id='chargement_featurisation_udf'></a>

## <span style='background:#7a1d2c'><span style='color:white'>Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF</span></span>

Le notebook définit la logique par étapes, jusqu'à Pandas UDF.

<u>L'empilement des appels est la suivante</u> :

- Pandas UDF
  - featuriser une série d'images pd.Series
   - prétraiter une image

In [23]:
# Resize images to 224x224
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

# Featurize images and return a series of vectors (flattened tensors)
def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param: content_series_iter, This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



<a id='execution_extraction_feat'></a>

## <span style='background:#7a1d2c'><span style='color:white'>Exécution des actions d'extraction de features</span></span>

Les Pandas UDF, sur de grands enregistrements (par exemple, de très grandes images), peuvent rencontrer des erreurs de type Out Of Memory (OOM).
Dans le cas où de teller erreurs apparaitrent, la ligne de code dans la cellule ci-dessous, permets de retuire la taille du lot Arrow via 'maxRecordsPerBatch'.

In [24]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

Nous pouvons maintenant exécuter la featurisation sur l'ensemble de notre DataFrame Spark.<br />

In [25]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

<u>Rappel du PATH où seront inscrits les fichiers au format "**parquet**" <br />
contenant nos résultats, à savoir, un DataFrame contenant 3 colonnes</u> :
 1. Path des images
 2. Label de l'image
 3. Vecteur de caractéristiques de l'image

In [26]:
print(PATH_Result)

/home/raquelsp/Documents/Openclassrooms/P8_Fruits_modele_cloud/P8_travail/Results_local/


In [27]:
features_df.show(5)

2023-08-25 09:38:23.204958: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-08-25 09:38:24.228809: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-08-25 09:38:24.231245: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.


+--------------------+------------------+--------------------+
|                path|             label|            features|
+--------------------+------------------+--------------------+
|file:/home/raquel...|         Raspberry|[0.29254088, 1.06...|
|file:/home/raquel...|        Strawberry|[2.20994, 0.09814...|
|file:/home/raquel...|        Strawberry|[1.6356167, 0.0, ...|
|file:/home/raquel...|             Peach|[0.13757613, 0.0,...|
|file:/home/raquel...|Tomato not Ripened|[0.0, 0.4384215, ...|
+--------------------+------------------+--------------------+
only showing top 5 rows



                                                                                

<a id='PCA'></a>

## <span style='background:#7a1d2c'><span style='color:white'>Réduction des dimensions via PCA</span></span>

<a id='preparation_donnees_pca'></a>

### <font size="+2" color="#63202b"><b>Préparation des données<b></font><br><a name="Tokenization"></a>

In [28]:
# First step is to convert our features arrays into vectors
array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df_with_vectors = features_df.select(
    features_df["path"],
    features_df["label"],
    features_df["features"],
    array_to_vector_udf(features_df["features"]).alias("vectorFeatures"),
)
# show the 5 first values :
df_with_vectors.show(5)

2023-08-25 09:38:26.977887: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-08-25 09:38:28.010429: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  SSE4.1 SSE4.2 AVX AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-08-25 09:38:28.012499: I tensorflow/core/common_runtime/process_util.cc:146] Creating new thread pool with default inter op setting: 2. Tune using inter_op_parallelism_threads for best performance.


+--------------------+------------------+--------------------+--------------------+
|                path|             label|            features|      vectorFeatures|
+--------------------+------------------+--------------------+--------------------+
|file:/home/raquel...|         Raspberry|[0.29254088, 1.06...|[0.29254087805747...|
|file:/home/raquel...|        Strawberry|[2.20994, 0.09814...|[2.20993995666503...|
|file:/home/raquel...|        Strawberry|[1.6356167, 0.0, ...|[1.63561666011810...|
|file:/home/raquel...|             Peach|[0.13757613, 0.0,...|[0.13757613301277...|
|file:/home/raquel...|Tomato not Ripened|[0.0, 0.4384215, ...|[0.0,0.4384214878...|
+--------------------+------------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [30]:
# Second step is to scale the data before applying the PCA process
scaler = StandardScaler(inputCol="vectorFeatures",
                        outputCol="scaledFeatures",
                        withMean=True, withStd=True
                        ).fit(df_with_vectors)

# when we transform the dataframe, the old feature will still remain in it
df_scaled = scaler.transform(df_with_vectors)
# show the 5 first values :
df_scaled.show(5)

                                                                                

+--------------------+------------------+--------------------+--------------------+--------------------+
|                path|             label|            features|      vectorFeatures|      scaledFeatures|
+--------------------+------------------+--------------------+--------------------+--------------------+
|file:/home/raquel...|         Raspberry|[0.29254088, 1.06...|[0.29254087805747...|[-0.2479661194709...|
|file:/home/raquel...|        Strawberry|[2.20994, 0.09814...|[2.20993995666503...|[3.09032401311497...|
|file:/home/raquel...|        Strawberry|[1.6356167, 0.0, ...|[1.63561666011810...|[2.09039769878233...|
|file:/home/raquel...|             Peach|[0.13757613, 0.0,...|[0.13757613301277...|[-0.5177676881965...|
|file:/home/raquel...|Tomato not Ripened|[0.0, 0.4384215, ...|[0.0,0.4384214878...|[-0.7572947915732...|
+--------------------+------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



<a id='reduc_dim'></a>

### <font size="+2" color="#63202b"><b>Réduction de dimensions via PCA<b></font><br><a name="Tokenization"></a>

In [33]:
# We will use the optimal number of components for explaining 95% of variance
# identified during the local development
n_components = 128

# Apply the PCA with n_components
pca = PCA(k=n_components, inputCol='scaledFeatures', outputCol='pcaFeatures')

model_pca = pca.fit(df_scaled)
df_pca = model_pca.transform(df_scaled)

# show the 5 first values :
df_pca.show(5)

23/08/25 09:41:53 WARN DAGScheduler: Broadcasting large task binary with size 1352.1 KiB
                                                                                

+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+
|                path|             label|            features|      vectorFeatures|      scaledFeatures|         pcaFeatures|
+--------------------+------------------+--------------------+--------------------+--------------------+--------------------+
|file:/home/raquel...|         Raspberry|[0.29254088, 1.06...|[0.29254087805747...|[-0.2479661194709...|[-18.915042887680...|
|file:/home/raquel...|        Strawberry|[2.20994, 0.09814...|[2.20993995666503...|[3.09032401311497...|[-19.988846857832...|
|file:/home/raquel...|        Strawberry|[1.6356167, 0.0, ...|[1.63561666011810...|[2.09039769878233...|[-31.014341250894...|
|file:/home/raquel...|             Peach|[0.13757613, 0.0,...|[0.13757613301277...|[-0.5177676881965...|[-3.0016836093650...|
|file:/home/raquel...|Tomato not Ripened|[0.0, 0.4384215, ...|[0.0,0.4384214878...|[-0.7572947915732...|[3.81594761774

<a id='enreg_resultats'></a>

## <span style='background:#7a1d2c'><span style='color:white'>Enregistrement des résultats</span></span>

<u>Enregistrement des données traitées au format "**parquet**"</u> :

In [34]:
df_results = df_pca.select(["path", "label", "pcaFeatures"])

In [35]:
df_results.write.mode("overwrite").parquet(PATH_Result)

23/08/25 09:41:55 WARN DAGScheduler: Broadcasting large task binary with size 1547.3 KiB
                                                                                

<a id='validation'></a>

## <span style='background:#7a1d2c'><span style='color:white'>Chargement des données enregistrées et validation du résultat</span></span>

<u>On charge les données fraichement enregistrées dans un **DataFrame Pandas**</u> :

In [36]:
df = spark.read.parquet(PATH_Result).toPandas()

<u>On affiche les 5 premières lignes du DataFrame</u> :

In [37]:
df.head()

Unnamed: 0,path,label,pcaFeatures
0,file:/home/raquelsp/Documents/Openclassrooms/P...,Raspberry,"[-19.29533577800976, 2.810266574434207, 13.612..."
1,file:/home/raquelsp/Documents/Openclassrooms/P...,Raspberry,"[-15.856456261290123, 5.086231665583818, 1.284..."
2,file:/home/raquelsp/Documents/Openclassrooms/P...,Peach,"[-5.279906953913183, -8.526506257805092, -6.65..."
3,file:/home/raquelsp/Documents/Openclassrooms/P...,Mangostan,"[-2.343878066472722, -0.3323713964594619, -9.4..."
4,file:/home/raquelsp/Documents/Openclassrooms/P...,Peach,"[-2.823781675367395, -8.785769088483162, -3.59..."


<u>On valide que la dimension des features est bien equivalente au nombre de composantes rétenu (128)</u> :

In [38]:
np.array(df.loc[0, "pcaFeatures"]).shape

(128,)

et que le numéro d'images traitées est :

In [39]:
df.shape

(300, 3)

<a id='conclusions'></a>

---
---

# <span style='background:#861141'><span style='color:white'>**Conclusions** </span></span>


Ce notebook s'intègre dans la deuxième phase du projet qui consiste à **créer unn réel cluster de calculs**.
L'objectif était de pouvoir **anticiper une future augmentation de la charge de travail**.

Le choix retenu a été l'utilisation du prestataire de services **Amazon Web Services** qui nous permet de louer à la demande de la puissance de calculs, pour un coût tout à fait acceptable.
En plus d'être rapide et simple à mettre en place, nous avons la certitude du bon fonctionnement de la solution, celle-ci ayant été préalablement validé par les ingénieurs d'Amazon.

Nous avons en utilisé le **service EMR** (Plateforme As A Service PAAS) qui permet d'instancier plusieurs serveurs (un **cluster**) sur lesquels nous avons installé et configuré plusieurs programmes et librairies nécessaires au projet comme Spark, Hadoop, JupyterHub ainsi que la librairie TensorFlow.

Nous avons exécuté le notebook qui avait été validé en local.
Nous avons exécuté le traitement sur l'ensemble des images du dossier "Test".

Nous avons opté pour le service Amazon S3 pour stocker les données de notre projet. S3 offre, pour un faible coût, toutes les conditions dont nous avons besoin pour stocker
et exploiter de manière efficace nos données.
L'espace alloué est potentiellement illimité, mais les coûts seront fonction de l'espace utilisé.

Il sera **simple de faire face à une monté de la charge de travail** en redimensionnant simplement notre cluster de machines (horizontalement et/ou verticalement au besoin), **les coûts augmenteront proportionnellement** mais resteront nettement inférieurs aux coûts engendrés par l'achat de matériels ou par la location de serveurs dédiés.