# Projet 8 : Déployer un modèle dans le cloud     
## Solution Big Data : AWS

### Objectifs du projet

1. Développer une première chaîne de traitement des données qui <br />
   comprendra le **preprocessing** et une étape de **réduction de dimension**.
 <br /> '      
2. Tenir compte du fait que <u>le volume de données va augmenter <br />
   très rapidement</u> après la livraison de ce projet, ce qui implique de:
 - Déployer le traitement des données dans un environnement **Big Data**
 - Développer les scripts en **pyspark** pour effectuer des **calculs distribués**

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1689743494016_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# Arrêtez SparkSession
spark.stop()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Configuration et démarrage de la session SPARK

In [3]:
%%configure -f
{ "conf":{
          "spark.pyspark.python": "python3",
          "spark.pyspark.virtualenv.enabled": "true",
          "spark.pyspark.virtualenv.type":"native",
          "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
         }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1689743494016_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1689743494016_0003,pyspark,idle,Link,Link,✔


In [5]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1689743494016_0003,pyspark,idle,Link,Link,✔


### Installation de dépendences nécessaires

In [6]:
sc.install_pypi_package("pip==23.2")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pip==23.2
  Downloading https://files.pythonhosted.org/packages/02/65/f15431ddee78562355ccb39097bf9160a1689f2db40dc418754be98806a1/pip-23.2-py3-none-any.whl (2.1MB)
Installing collected packages: pip
  Found existing installation: pip 9.0.1
    Uninstalling pip-9.0.1:
      Successfully uninstalled pip-9.0.1
Successfully installed pip-23.2

In [7]:
sc.install_pypi_package("pandas==1.2.5")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting pandas==1.2.5
  Downloading pandas-1.2.5-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (9.9 MB)
     ━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━ 9.9/9.9 MB 100.7 MB/s eta 0:00:00
Installing collected packages: pandas
Successfully installed pandas-1.2.5

DEPRECATION: astor file-astor-VERSION has a non-standard version number. pip 23.3 will enforce this behaviour change. A possible replacement is to upgrade to a newer version of astor or contact the author to suggest that they release a version with a conforming version number. Discussion can be found at https://github.com/pypa/pip/issues/12063

In [8]:
# Téléchargement des librairies

import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
import pandas as pd

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Création des chemins à l'intérieur de S3

In [28]:
PATH_S3 = 's3://p8-data-flo/'
PATH_Data = 's3://p8-data-flo/Test/'
PATH_Result = 's3://p8-data-flo/Results/'
print('PATH_S3:      '+\
      PATH_S3+'\nPATH_Data:    '+\
      PATH_Data+'\nPATH_Result:  '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH_S3:      s3://p8-data-flo/
PATH_Data:    s3://p8-data-flo/Test/
PATH_Result:  s3://p8-data-flo/Results/

## Traitement des données

### Chargement des données

In [17]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load('s3://p8-data-flo/Test/')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p8-data-flo/...|2023-07-18 17:55:57|  7353|[FF D8 FF E0 00 1...|
|s3://p8-data-flo/...|2023-07-18 17:55:58|  7350|[FF D8 FF E0 00 1...|
|s3://p8-data-flo/...|2023-07-18 17:55:58|  7349|[FF D8 FF E0 00 1...|
|s3://p8-data-flo/...|2023-07-18 17:55:57|  7348|[FF D8 FF E0 00 1...|
|s3://p8-data-flo/...|2023-07-18 17:56:28|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

In [19]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+----------------------------------------------+----------+
|path                                          |label     |
+----------------------------------------------+----------+
|s3://p8-data-flo/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://p8-data-flo/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://p8-data-flo/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://p8-data-flo/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://p8-data-flo/Test/Watermelon/r_95_100.jpg |Watermelon|
+----------------------------------------------+----------+
only showing top 5 rows

None

## Modèle de transfert learning (MobileNetV2)

In [20]:
# Instenciation et téléchargement des paramètres

model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [21]:
# On ne conserve pas les 2 derniers bloc
# afin que le modèle s'adapte à nos données

new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# Création d'une variable broadcast contenant les poids du modèle,
# Cette variable est accessible depuis tous les noeuds du cluster SPARK
# Cela permet slave de ne pas à avoir à consulter le master pour les poids

brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Typologie du réseaux de neurones
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [24]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    
    for layer in model.layers:
        layer.trainable = False
        
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    
    new_model.set_weights(brodcast_weights.value)
    
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    
    return preprocess_input(arr)


def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    
    output = [p.flatten() for p in preds]
    
    return pd.Series(output)



@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    Param content_series_iter: This argument is an iterator over batches of data, where each batch
    is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [26]:
# Lecture des images par le réseau MobileNetV2 
# Sans l'aspect classifier, juste la création de features

features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Création d'un objet parquet 
Pour chaque photo (22_139 lignes), la colonne features contient un vecteur de taille (1280)

In [29]:
features_df.write.mode("append").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [49]:
df = spark.read.parquet("s3://p8-data-flo/Results/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [55]:
df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

22139

In [56]:
df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+
|                path|         label|            features|
+--------------------+--------------+--------------------+
|s3://p8-data-flo/...|    Watermelon|[0.12482018, 0.02...|
|s3://p8-data-flo/...|    Watermelon|[0.03623737, 0.15...|
|s3://p8-data-flo/...|Pineapple Mini|[0.0, 4.4438276, ...|
|s3://p8-data-flo/...|    Watermelon|[0.015379993, 0.3...|
|s3://p8-data-flo/...|Pineapple Mini|[0.007994337, 4.5...|
+--------------------+--------------+--------------------+
only showing top 5 rows

In [57]:
# Création d'une copie type DataFrame pandas
df_pandas = df.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [59]:
df_pandas.head(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                               path  ...                                           features
0     s3://p8-data-flo/Test/Watermelon/r_90_100.jpg  ...  [0.12482018023729324, 0.026548253372311592, 0....
1     s3://p8-data-flo/Test/Watermelon/r_96_100.jpg  ...  [0.03623737022280693, 0.15497834980487823, 0.0...
2  s3://p8-data-flo/Test/Pineapple Mini/123_100.jpg  ...  [0.0, 4.4438276290893555, 0.02875959686934948,...
3    s3://p8-data-flo/Test/Watermelon/r_176_100.jpg  ...  [0.015379993245005608, 0.35308343172073364, 0....
4    s3://p8-data-flo/Test/Pineapple Mini/7_100.jpg  ...  [0.007994337007403374, 4.551527500152588, 0.0,...

[5 rows x 3 columns]

## PCA avec PYSPARK
Le DataFrame SPARK df n'est pas utilsable en l'état. Quelques transformations sont nécéssaires.     
L'ACP sera faite dans le but de réduire la taille des vecteurs (1_280)

In [73]:
from pyspark.ml.linalg import Vectors
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql import SparkSession


# Fonction UDF pour convertir les tableaux en vecteurs denses
def array_to_dense_vector(arr):
    return Vectors.dense(arr)

# Enregistrement de la fonction UDF
array_to_dense_vector_udf = F.udf(array_to_dense_vector, ArrayType(FloatType()))

# Application de la fonction UDF pour remplacer la colonne "features"
df_pca = df.withColumn("features", array_to_dense_vector_udf("features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [80]:
df_pca.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)

In [81]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType

# Fonction UDF pour convertir un array en DenseVector
array_to_dense_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())

# Appliquer la fonction UDF pour remplacer la colonne "features" par des DenseVectors
df_pca_1 = df.withColumn("features", array_to_dense_vector_udf("features"))

# Maintenant, le schéma devrait indiquer que "features" est de type Vector
df_pca_1.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: vector (nullable = true)

In [86]:
from pyspark.ml.feature import PCA

# Extraire les caractéristiques et les labels du DataFrame
features = np.stack(df_pca_1.select("features").collect())
labels = df_pca_1.select("label").rdd.flatMap(lambda x: x).collect()

# Calculer l'ACPe
pca = PCA(k=35, inputCol="features", outputCol="pca_features")
model_pca = pca.fit(df_pca_1)
reduced_features = model_pca.transform(df_pca_1)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [87]:
# Calculer la variance expliquée par chaque composante principale
variances = model_pca.explainedVariance.toArray()

# Calculer la variance cumulée
cumulative_variances = np.cumsum(variances)

print(cumulative_variances)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[0.10220291 0.18175722 0.24481569 0.29456436 0.33026391 0.35925439
 0.38692624 0.40992876 0.42989468 0.44922712 0.46595849 0.4807573
 0.49487187 0.50857714 0.52198632 0.53437568 0.54595938 0.55672304
 0.56657629 0.576275   0.58542725 0.59375493 0.6016978  0.6091446
 0.61631158 0.6233919  0.63015033 0.63636441 0.64246036 0.64830567
 0.65403929 0.65961538 0.66491309 0.66995843 0.67474832]

In [94]:
reduced_features.show(2)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+--------------------+--------------------+
|                path|     label|            features|        pca_features|
+--------------------+----------+--------------------+--------------------+
|s3://p8-data-flo/...|Watermelon|[0.12482018023729...|[-2.9710018795316...|
|s3://p8-data-flo/...|Watermelon|[0.03623737022280...|[-2.5668312036456...|
+--------------------+----------+--------------------+--------------------+
only showing top 2 rows

In [95]:
reduced_features.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- pca_features: vector (nullable = true)

## Enregistrement des résultats sur un fichier CSV dans S3
Ce fichier aura la particularité de contenir
- le chemin PATH
- le label LABEL
- les features (1280)
- les composantes principales (35)

In [91]:
import json
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

# Définition de la fonction UDF pour formater les cellules de la colonne "features"
def format_features(features_array):
    return json.dumps(features_array.tolist())

# Enregistrement de la fonction UDF
format_features_udf = udf(format_features, StringType())

# Définition de la fonction UDF pour formater les cellules de la colonne "pca_features"
def format_pca_features(pca_features_array):
    return json.dumps(pca_features_array.tolist())



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [92]:
# Enregistrement de la fonction UDF
format_pca_features_udf = udf(format_pca_features, StringType())

# Ajout de nouvelles colonnes "formatted_features" et "formatted_pca_features"
# contenant les données formatées entre crochets [ ]
df_formatted = reduced_features.withColumn("formatted_features", format_features_udf("features")) \
                .withColumn("formatted_pca_features", format_pca_features_udf("pca_features"))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [96]:
df_formatted.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- pca_features: vector (nullable = true)
 |-- formatted_features: string (nullable = true)
 |-- formatted_pca_features: string (nullable = true)

In [97]:
df_formatted.write.mode("append").parquet("s3://p8-data-flo/PCA-CVS/")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [98]:
# Importez le module json
import json

# Définition de la fonction UDF pour convertir en JSON
def to_json(col):
    return json.dumps(col.tolist())

# Enregistrement de la fonction UDF
to_json_udf = udf(to_json, StringType())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [99]:
df_json = df_formatted.withColumn("json_features", to_json_udf("features")) \
                     .withColumn("json_pca_features", to_json_udf("pca_features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [105]:
# Coalesce le DataFrame pour n'avoir qu'une seule partition
df_formatted_single_partition = df_json.coalesce(1)
df_formatted_single_partition.select("path", "label", "json_features", "json_pca_features").write.mode("append").csv("s3://p8-data-flo/PCA-CVS/")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…