<h3 align="center"><font size="6.5">Projet 8 : Déployez un modèle dans le cloud</font></h3>

<h3 align="center"><font size="5">Création des vecteurs sur AWS</font></h3>

* [Chapitre 1 : Préparation des données](#chapter1)
    * [Librairies et données](#section_1_1)
    * [Dataframe](#section_1_2)
       
* [Chapitre 2 : Modélisation](#chapter2)
    * [Création de fonction](#section_2_1)
    * [Réduction dimensionnelle](#section_2_2)
    * [Sauvegarde des données](#section_2_3)

<a class="anchor" id="chapter1"></a>
# Chapitre 1 : Préparation des données
Nous allons reproduire notre travail de reconnaissance d'images de fruits et légumes sur le cloud AWS. Le jeu de données est le même mais celui-ci se trouve sur les serveurs S3.

<a class="anchor" id="section_1_1"></a>
## Librairies et données

Avant de démarrer le script, il est fondamental de changer le Kernel en PySpark pour que le script puisse s'éxecuter sur le cloud. Nous allons executer une cellule vide qui va alors afficher des informations liées à la session Spark qui est lancée.

In [1]:
# Commencer par changer le kernel en "Pyspark" dans le menu "Kernel"

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1686837425960_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1686837425960_0002,pyspark,idle,Link,Link,✔


In [3]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.ml.feature import PCA, RobustScaler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.functions import vector_to_array
from pyspark.ml import Pipeline

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
PATH = "s3://fk-p8-fruits"

PATH_Data = PATH+'/test'

PATH_Result = PATH+'/Results'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<a class="anchor" id="section_1_2"></a>
## Dataframe
Nous allons charger les données dans un dataframe pandas UDF dans un format binaire pour offrir plus de souplesse dans le prétraitement des images. Nous ajoutons comme option qu'il faut charger l'ensemble eds images au format jpg et que nous chargeons toutes les données des sous dossiers du dossier ciblé.

In [6]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<a class="anchor" id="chapter2"></a>
# Chapitre 2 : Modélisation
<a class="anchor" id="section_2_1"></a>
## Création de fonctions
Nous allons utiliser le modèle MobileNetV2 qui est un modèle conçu pour classifier des images selon 1000 classes. Ce modèle présente l'avantage d'être plus rapide à éxecuter que le modèle VGG16. Comme nous ne voulons pas de la classification (dernière couche), nous créeons un modèle qui va exclure la dernière couche (Transfer learning).

En revanche, nous spécifions que les données d'entrées doivent avoir une dimension de 224x224x3, ce qui est la dimension par défaut pour le modèle MobileNetV2. Nous serons donc amené à redimensionner les images que nous allons utiliser.

In [8]:
model_initial = MobileNetV2(weights='imagenet',
                            include_top=True,
                            input_shape=(224, 224, 3))

model_transfer_learning = Model(inputs=model_initial.input,
                                outputs=model_initial.layers[-2].output)

model_transfer_learning.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [9]:
broadcast_weights = sc.broadcast(model_transfer_learning.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
def model_fn():
    """
    Retourne un modèle MobileNetV2 où la dernière couche a été retirée
    et avec une diffusion des poids du modèle pré-entraîné.
    """
    model = MobileNetV2(weights=None,
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(broadcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
def pretraitement(content):
    """
    Prétraitement des images brut pour les mettre à la bonne dimension (224,224,3).
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(pretraitement))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                                is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches. This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



Nous créeons un nouveau dataframe pandas udf basé sur le dataframe "images". Les deux premières colonnes représentent les deux colonnes du dataframe pandas udf "images" ("path" et "label"). La troisème colonne représente le résultat de l'application @pandas_udf (featurization).

In [14]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<a class="anchor" id="section_2_2"></a>
## Réduction dimensionnelle
En appliquant le modèle MobilenNetV2, les features comportent 1280 valeurs. Nous allons réduire ce nombre grâce a une ACP. Nous allons d'abord normaliser les données grâce à un RobustScaler puis effectuer une ACP.

Pour ce faire, nous devons convertir chaque feature, qui sont des arrays en vecteurs pour que la normalisation et l'ACP puisse se faire. Nous avons testé en local et nous avons déterminé que pour expliquer 90 % des données, nous pouvions utiliser 67 composantes principales.

In [15]:
arr_to_vect = udf(lambda a:Vectors.dense(a), VectorUDT())
vect_features_df = features_df.withColumn("vecteur", arr_to_vect("features"))

scaler = RobustScaler(inputCol="vecteur", outputCol="scaled_features")

pca = PCA(k=67, inputCol="vecteur", outputCol="pca_features")

pipeline = Pipeline(stages = [scaler, pca]).fit(vect_features_df)

df_pca = pipeline.transform(vect_features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
data_to_keep = df_pca.select("path","label", vector_to_array("pca_features").alias("pca_features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
data_to_keep.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- pca_features: array (nullable = false)
 |    |-- element: double (containsNull = false)

<a class="anchor" id="section_2_3"></a>
## Sauvegarde des données
Nous enregistrons les données au format "parquet" et nous pourrons réimporter les résultats pour les lire avec un pandas dataframe.

In [18]:
data_to_keep.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                       pca_features
0  s3://fk-p8-fruits/test/Apple Braeburn/r_39_100...  ...  [-3.5176484346403947, -17.95835101090051, -1.6...
1  s3://fk-p8-fruits/test/Apple Crimson Snow/89_1...  ...  [-1.0644522912832493, -11.962073301113843, -1....
2  s3://fk-p8-fruits/test/Apple Crimson Snow/94_1...  ...  [-1.671808884682268, -12.906525166701519, -1.1...
3   s3://fk-p8-fruits/test/Apple Braeburn/34_100.jpg  ...  [-3.2354248624178306, -17.73814823581837, -1.5...
4  s3://fk-p8-fruits/test/Apple Granny Smith/32_1...  ...  [1.82602044706043, -5.1924175680169204, -13.69...

[5 rows x 3 columns]

In [21]:
df.loc[0,'pca_features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(67,)

In [22]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(373, 3)

In [23]:
df.info()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 373 entries, 0 to 372
Data columns (total 3 columns):
 #   Column        Non-Null Count  Dtype 
---  ------        --------------  ----- 
 0   path          373 non-null    object
 1   label         373 non-null    object
 2   pca_features  373 non-null    object
dtypes: object(3)
memory usage: 8.9+ KB

Nous exportons les résultats sous forme de fichier csv qui sera utilisés pour réaliser en local une analyse de l'efficacité du modèle MobileNetV2.

In [24]:
df.to_csv(PATH+"/resultats_featurize.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…