# Déployez un modèle dans le cloud


# Sommaire :

**1. Préambule**<br />
&emsp;1.1 Problématique<br />
&emsp;1.2 Objectifs dans ce projet<br />
&emsp;1.3 Déroulement des étapes du projet<br />
**2. Choix techniques généraux retenus**<br />
&emsp;2.1 Calcul distribué avec pyspark<br />
&emsp;2.2 Transfert Learning<br />
**3. Déploiement de la solution sur le cloud avec JupyterHub**<br />
&emsp;3.1 Import des librairies<br />
&emsp;3.2 Définition des PATH pour charger les images et enregistrer les résultats<br />
&emsp;3.3 Traitement des données<br />
&emsp;&emsp;3.3.1 Chargement des données<br />
&emsp;&emsp;3.3.2 Préparation du modèle<br />
&emsp;&emsp;3.3.3 Définition du processus de chargement des images et application <br />
&emsp;&emsp;&emsp;&emsp;&emsp;de leur featurisation à travers l'utilisation de pandas UDF<br />
&emsp;&emsp;3.3.4 Réduction de dimension PCA <br />
&emsp;&emsp;3.3.5 Exécution des actions d'extractions de features<br />
&emsp;3.6 Chargement des données enregistrées et validation du résultat<br />


# 1. Préambule

## 1.1 Problématique

La très jeune start-up de l'AgriTech, nommée "**Fruits**!", <br />
cherche à proposer des solutions innovantes pour la récolte des fruits.

La volonté de l’entreprise est de préserver la biodiversité des fruits <br />
en permettant des traitements spécifiques pour chaque espèce de fruits <br />
en développant des robots cueilleurs intelligents.

La start-up souhaite dans un premier temps se faire connaître en mettant <br />
à disposition du grand public une application mobile qui permettrait aux <br />
utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

Pour la start-up, cette application permettrait de sensibiliser le grand public <br /> 
à la biodiversité des fruits et de mettre en place une première version du moteur <br />
de classification des images de fruits.

De plus, le développement de l’application mobile permettra de construire <br />
une première version de l'architecture **Big Data** nécessaire.

## 1.2 Objectifs dans ce projet

1. Développer une première chaîne de traitement des données qui <br />
   comprendra le **preprocessing** et une étape de **réduction de dimension**.
2. Tenir compte du fait que <u>le volume de données va augmenter <br />
   très rapidement</u> après la livraison de ce projet, ce qui implique de:
 - Déployer le traitement des données dans un environnement **Big Data**
 - Développer les scripts en **pyspark** pour effectuer du **calcul distribué**

# 2 Choix des techniques généraux retenus 

## 2.1 Calcul distribué

L’énoncé du projet nous impose de développer des scripts en **pyspark** <br />
afin de <u>prendre en compte l’augmentation très rapide du volume <br />
de donné après la livraison du projet</u>.

Pour comprendre rapidement et simplement ce qu’est **pyspark** <br />
et son principe de fonctionnement, nous vous conseillons de lire <br />
cet article : [PySpark : Tout savoir sur la librairie Python](https://datascientest.com/pyspark)

<u>Le début de l’article nous dit ceci </u>:<br />
« *Lorsque l’on parle de traitement de bases de données sur python, <br />
on pense immédiatement à la librairie pandas. Cependant, lorsqu’on a <br />
affaire à des bases de données trop massives, les calculs deviennent trop lents.<br />
Heureusement, il existe une autre librairie python, assez proche <br />
de pandas, qui permet de traiter des très grandes quantités de données : PySpark.<br />
Apache Spark est un framework open-source développé par l’AMPLab <br />
de UC Berkeley permettant de traiter des bases de données massives <br />
en utilisant le calcul distribué, technique qui consiste à exploiter <br />
plusieurs unités de calcul réparties en clusters au profit d’un seul <br />
projet afin de diviser le temps d’exécution d’une requête.<br />
Spark a été développé en Scala et est au meilleur de ses capacités <br />
dans son langage natif. Cependant, la librairie PySpark propose de <br />
l’utiliser avec le langage Python, en gardant des performances <br />
similaires à des implémentations en Scala.<br />
Pyspark est donc une bonne alternative à la librairie pandas lorsqu’on <br />
cherche à traiter des jeux de données trop volumineux qui entraînent <br />
des calculs trop chronophages.* »

Comme nous le constatons, **pySpark** est un moyen de communiquer <br />
avec **Spark** via le langage **Python**.<br />
**Spark**, quant à lui, est un outil qui permet de gérer et de coordonner <br />
l'exécution de tâches sur des données à travers un groupe d'ordinateurs. <br />
<u>Spark (ou Apache Spark) est un framework open source de calcul distribué <br />
in-memory pour le traitement et l'analyse de données massives</u>.

Un autre [article très intéressant et beaucoup plus complet pour <br />
comprendre le **fonctionnement de Spark**](https://www.veonum.com/apache-spark-pour-les-nuls/), ainsi que le rôle <br />
des **Spark Session** que nous utiliserons dans ce projet.

<u>Voici également un extrait</u>:

*Les applications Spark se composent d’un pilote (« driver process ») <br />
et de plusieurs exécuteurs (« executor processes »). Il peut être configuré <br />
pour être lui-même l’exécuteur (local mode) ou en utiliser autant que <br />
nécessaire pour traiter l’application, Spark prenant en charge la mise <br />
à l’échelle automatique par une configuration d’un nombre minimum <br />
et maximum d’exécuteurs.*

![Schéma de Spark](img/spark-schema.png)

*Le driver (parfois appelé « Spark Session ») distribue et planifie <br />
les tâches entre les différents exécuteurs qui les exécutent et permettent <br />
un traitement réparti. Il est le responsable de l’exécution du code <br />
sur les différentes machines.

Chaque exécuteur est un processus Java Virtual Machine (JVM) distinct <br />
dont il est possible de configurer le nombre de CPU et la quantité de <br />
mémoire qui lui est alloué. <br />
Une seule tâche peut traiter un fractionnement de données à la fois.*

Dans les deux environnements (Local et Cloud) nous utiliserons donc **Spark** <br />
et nous l’exploiterons à travers des scripts python grâce à **PySpark**.

Dans la <u>version locale</u> de notre script nous **simulerons <br />
le calcul distribué** afin de valider que notre solution fonctionne.<br />
Dans la <u>version cloud</u> nous **réaliserons les opérations sur un cluster de machine**.

## 2.2 Transfert Learning

L'énoncé du projet nous demande également de <br />
réaliser une première chaîne de traitement <br />
des données qui comprendra le preprocessing et <br />
une étape de réduction de dimension.

Il est également précisé qu'il n'est pas nécessaire <br />
d'entraîner un modèle pour le moment.

Nous décidons de partir sur une solution de **transfert learning**.

Simplement, le **transfert learning** consiste <br />
à utiliser la connaissance déjà acquise <br />
par un modèle entraîné (ici **MobileNetV2**) pour <br />
l'adapter à notre problématique.

Nous allons fournir au modèle nos images, et nous allons <br />
<u>récupérer l'avant dernière couche</u> du modèle.<br />
En effet la dernière couche de modèle est une couche softmax <br />
qui permet la classification des images ce que nous ne <br />
souhaitons pas dans ce projet.

L'avant dernière couche correspond à un **vecteur <br />
réduit** de dimension (1,1,1280).

Cela permettra de réaliser une première version du moteur <br />
pour la classification des images des fruits.

**MobileNetV2** a été retenu pour sa <u>rapidité d'exécution</u>, <br />
particulièrement adaptée pour le traitement d'un gros volume <br />
de données ainsi que la <u>faible dimensionnalité du vecteur <br />
de caractéristique en sortie</u> (1,1,1280)

# 3. Déploiement de la solution sur le cloud avec JupyterHub

# 3.1 Import des librairies

In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.ml.feature import PCA, VectorAssembler, VectorIndexer
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf

Starting Spark application


The code failed because of a fatal error:
	Session 0 did not start up in 60 seconds..

Some things to try:
a) Make sure Spark has enough available resources for Jupyter to create a Spark context.
b) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.
c) Restart the kernel.


# 3.2 Définition des PATH pour charger les images et enregistrer les résultats

In [2]:
PATH = 's3://workspace8'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/results/'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

The code failed because of a fatal error:
	Session 0 did not start up in 60 seconds..

Some things to try:
a) Make sure Spark has enough available resources for Jupyter to create a Spark context.
b) Contact your Jupyter administrator to make sure the Spark magics library is configured correctly.
c) Restart the kernel.


# 3.3 Traitement des données

# 3.3.1 Chargement des données

In [7]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
images.show(5)

In [8]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+---------------------------------------------+----------+
|path                                         |label     |
+---------------------------------------------+----------+
|s3://workspace8/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://workspace8/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://workspace8/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://workspace8/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://workspace8/Test/Watermelon/r_95_100.jpg |Watermelon|
+---------------------------------------------+----------+
only showing top 5 rows

None

In [7]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+----------+
|                path|   modificationTime|length|             content|     label|
+--------------------+-------------------+------+--------------------+----------+
|s3://workspace8/T...|2023-11-03 15:11:41|  7353|[FF D8 FF E0 00 1...|Watermelon|
|s3://workspace8/T...|2023-11-03 15:12:20|  7350|[FF D8 FF E0 00 1...|Watermelon|
|s3://workspace8/T...|2023-11-03 15:12:46|  7349|[FF D8 FF E0 00 1...|Watermelon|
|s3://workspace8/T...|2023-11-03 15:12:28|  7348|[FF D8 FF E0 00 1...|Watermelon|
|s3://workspace8/T...|2023-11-03 15:12:42|  7328|[FF D8 FF E0 00 1...|Watermelon|
+--------------------+-------------------+------+--------------------+----------+
only showing top 5 rows

In [9]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+---------------------------------------------+----------+
|path                                         |label     |
+---------------------------------------------+----------+
|s3://workspace8/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://workspace8/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://workspace8/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://workspace8/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://workspace8/Test/Watermelon/r_95_100.jpg |Watermelon|
+---------------------------------------------+----------+
only showing top 5 rows

None

In [13]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 3.3.2 Préparation du modèle 

In [14]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 3.3.3 Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

In [15]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [7]:
# 1280 features sont extraites par images du modèle MobileNetV2
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

NameError: name 'images' is not defined

In [16]:
features_df.select('path').show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|                path|
+--------------------+
|s3://workspace8/T...|
|s3://workspace8/T...|
|s3://workspace8/T...|
|s3://workspace8/T...|
|s3://workspace8/T...|
+--------------------+
only showing top 5 rows

In [17]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://workspace8/results

In [22]:
ud_f = udf(lambda r: Vectors.dense(r), VectorUDT())
df = features_df.withColumn('features', ud_f('features'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 3.3.4 Réduction de dimension PCA

In [23]:
# Passage de dimensions de 1280 à 1000 
pca = PCA(k=1000, inputCol="features", outputCol="reduction_features")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
pca = pca.fit(df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# 1 % de perte d'information avec 1000 dimensions
sum(pca.explainedVariance[:1000])

In [None]:
# Passage de dimensions de 1280 à 400 
pca = PCA(k=400, inputCol="features", outputCol="reduction_features")

In [None]:
pca = pca.fit(df)

In [None]:
# 5 % de perte d'information avec 400 dimensions
sum(pca.explainedVariance[:400])

In [25]:
ps_pca = pca.transform(df)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
ps_pca.select('reduction_features').show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+
|  reduction_features|
+--------------------+
|[-1.5997433737116...|
|[-3.8210221647130...|
|[-3.7545830196284...|
|[-2.8491111407258...|
|[-2.8817704189352...|
|[-3.3311643184080...|
|[1.13456750300022...|
|[-1.8620623894754...|
|[-3.6497412501985...|
|[-5.3369387630205...|
+--------------------+
only showing top 10 rows

In [31]:
np.array(pca.explainedVariance)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

array([0.11025705, 0.08229517, 0.06893004, 0.05263199, 0.03212438,
       0.0294074 , 0.0259836 , 0.02564273, 0.02107345, 0.01985178,
       0.01610656, 0.01571898, 0.01496373, 0.01350037, 0.01288464,
       0.01197522, 0.0119051 , 0.01088079, 0.01021353, 0.01000312,
       0.00933051, 0.00857012, 0.0082015 , 0.00771912, 0.00729559,
       0.0071514 , 0.00679508, 0.00658153, 0.00620601, 0.00601148,
       0.0057978 , 0.00563653, 0.00526852, 0.00501938, 0.00497602,
       0.00479603, 0.00462561, 0.00438635, 0.00430463, 0.00404711,
       0.0039914 , 0.00387078, 0.00381646, 0.00370534, 0.00367309,
       0.00356539, 0.00349344, 0.00331452, 0.00325374, 0.00310388,
       0.00307751, 0.00301878, 0.00293913, 0.00285057, 0.00282252,
       0.00273707, 0.00264228, 0.00257282, 0.00252379, 0.00251953,
       0.00245173, 0.00236026, 0.0023477 , 0.00229504, 0.00226674,
       0.00217708, 0.00212263, 0.00206343, 0.0019949 , 0.00197748,
       0.00195343, 0.00191285, 0.00189625, 0.00184476, 0.00180

In [29]:
fig, axes = plt.subplots(figsize=(8,8))
axes.set_xticks(np.arange(100))
axes.plot(np.arange(1,101), np.cumsum(np.array(pca.explainedVariance)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[<matplotlib.lines.Line2D object at 0x7fc898aaef50>]

In [30]:
df_pcatr = ps_pca.select(['path','label',ps_pca['reduction_features'].cast('string')])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 3.3.5 Exécution des actions d'extractions de feature

In [32]:
df_pcatr.write.mode("overwrite").parquet('s3://workspace8/results/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 3.6 Chargement des données enregistrées et validation du résultat

In [33]:
df = pd.read_parquet('s3://workspace8/results/')
df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                    path  ...                                 reduction_features
0       s3://workspace8/Test/Cucumber Ripe/r_165_100.jpg  ...  [1.6773020002289143,1.7483399145361351,-1.3805...
1       s3://workspace8/Test/Cucumber Ripe/r_161_100.jpg  ...  [1.301202286620246,1.2825054892830472,-3.26622...
2             s3://workspace8/Test/Raspberry/108_100.jpg  ...  [2.074106398725068,5.335605474536186,3.1078432...
3             s3://workspace8/Test/Pineapple/189_100.jpg  ...  [-4.562556825992316,5.331880594886458,-0.83085...
4         s3://workspace8/Test/Cauliflower/r_299_100.jpg  ...  [-3.9330013962328554,4.3481373459280945,1.1008...
...                                                  ...  ...                                                ...
15986         s3://workspace8/Test/Corn Husk/233_100.jpg  ...  [-2.882837935137558,6.80602655848726,3.7090769...
15987  s3://workspace8/Test/Banana Lady Finger/r_51_1...  ...  [-3.282862304118072,0.59879491163