# Déployez un modèle dans le cloud (suite)

# 4. Déploiement de la solution sur le cloud

### 4.1 Démarrage de la session Spark

In [None]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1747234137896_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [None]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1747234137896_0001,pyspark,idle,Link,Link,,✔


### 4.2 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

### 4.3 Import des librairies

In [None]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
from pyspark.ml.feature import PCA, StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf, col
from pyspark.sql.types import ArrayType, DoubleType, FloatType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.4 Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [None]:
# définir le chemin d'accès au bucket p9-compartiment, au répertoire d'images Test_aws et au dossier Results où seront 
# enregistrés les résultats
PATH = 's3://p9-compartiment'
PATH_Data = PATH+'/Test_aws'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://p9-compartiment
PATH_Data:   s3://p9-compartiment/Test_aws
PATH_Result: s3://p9-compartiment/Results

### 4.5 Traitement des données

#### 4.5.1 Chargement des données
<p>Pour des raison de coûts nous n'allons charger que 5 images par variété de fruit, soit 131 x 5 = 655 images </p>

In [None]:
# on charge les fichiers binaires (en l'occurrence les images dans le dossier Test_aws) à partir de PATH_DATA défini plus haut
# tout en filtrant pour ne récupérer que les images au format jpg
# l'option recursiveFileLookup effectue une recherche récursive dans tous les sous-dossiers.
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# afficher les 5 premières lignes du dataframe pyspark
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p9-compartim...|2025-05-11 14:35:01|  7324|[FF D8 FF E0 00 1...|
|s3://p9-compartim...|2025-05-11 14:35:01|  7314|[FF D8 FF E0 00 1...|
|s3://p9-compartim...|2025-05-11 14:34:50|  7311|[FF D8 FF E0 00 1...|
|s3://p9-compartim...|2025-05-11 14:34:50|  7215|[FF D8 FF E0 00 1...|
|s3://p9-compartim...|2025-05-11 14:35:02|  6824|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [None]:
# On éclate chaque chaîne de caractères de 'path' au niveau des /, on récupère l'avant-dernier morceau en guise de label,
# puis on ajoute une nouvelle colonne au dataframe pour y stocker les labels
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))

# on affiche les infos du dataframe avec printSchema() (équivalent du df.info() de pandas)
print(images.printSchema())

# on sélectionne uniquement les colonnes path et label et on affiche les 5 premières ligne
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+---------------------------------------------------------+--------------+
|path                                                     |label         |
+---------------------------------------------------------+--------------+
|s3://p9-compartiment/Test_aws/Pineapple Mini/168_100.jpg |Pineapple Mini|
|s3://p9-compartiment/Test_aws/Pineapple Mini/244_100.jpg |Pineapple Mini|
|s3://p9-compartiment/Test_aws/Raspberry/274_100.jpg      |Raspberry     |
|s3://p9-compartiment/Test_aws/Raspberry/51_100.jpg       |Raspberry     |
|s3://p9-compartiment/Test_aws/Pineapple Mini/r_77_100.jpg|Pineapple Mini|
+---------------------------------------------------------+--------------+
only showing top 5 rows

None

#### 4.5.2 Préparation du modèle

In [None]:
# charger le modèle MobileNetV2 en conservant la dernière couche (de classification)
# le modèle admet des images RGB en 224 par 224 pixels
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

[1m       0/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m0s[0m 0s/step
[1m   49152/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m24s[0m 2us/step
[1m   81920/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m28s[0m 2us/step
[1m  212992/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m14s[0m 1us/step
[1m  376832/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m9s[0m 1us/step 
[1m  557056/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m7s[0m 1us/step
[1m  950272/14536120[0m [32m━[0m[37m━━━━━━━━━━━━━━━━━━━[0m [1m5s[0m 

In [None]:
# puisque nous n'allons pas classifier les images mais extraire leurs features
# nous créons un nouveau modèle en retirant la couche de classification
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# les poids du modèle sont chargés et stockés dans la mémoire RAM du driver
# le driver diffuse une copie complète des poids à chaque machine exécutante
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# afficher les infos sur l'architecture du modèle pré-entrainé
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "functional"
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓
┃ Layer (type)        ┃ Output Shape      ┃    Param # ┃ Connected to      ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩
│ input_layer         │ (None, 224, 224,  │          0 │ -                 │
│ (InputLayer)        │ 3)                │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ Conv1 (Conv2D)      │ (None, 112, 112,  │        864 │ input_layer[0][0] │
│                     │ 32)               │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ bn_Conv1            │ (None, 112, 112,  │        128 │ Conv1[0][0]       │
│ (BatchNormalizatio… │ 32)               │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ Conv1_relu (ReLU)   │ (None, 112, 112,  │          0 │

In [None]:
# fonction qui charge le modèle MobileNetV2, gèle ses poids,
# puis créé un nouveau modèle sans la couche de classification et doté des poids de MobileNetV2
# diffusés aux machines exécutantes
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 4.5.3 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [None]:
# fonction pour le prétraitement des images qui sont à l'origine en 100 par 100 pixels
# et qui par conséquent doivent être redimensionnées en 224 par 224
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



#### 4.5.4 Exécutions des actions d'extractions de features

In [None]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Extraire les features des images en faisant appel aux fonctions créées plus haut
# le dataframe spark "images" est divisé en 24 partitions qui seront traitées en parallèle
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Sauvegarde des résultats, génère 24 fichiers parquet puisque nous avons 24 partitions
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 4.5.5 Réduction PCA

<u>On réduit la dimensionnalité des features image via une analyse en composantes principales</u> :

In [None]:
# fonction pour le preprocessing des images
def preprocess_data_for_pca(df):

    # convertir les listes en Spark Vectors
    vector_udf = udf(lambda x: Vectors.dense(x), VectorUDT())
    df = df.withColumn("features_vector", vector_udf("features"))

    # standardiser les données
    scaler = StandardScaler(inputCol="features_vector", outputCol="scaled_features", withMean=True, withStd=True)
    scaler_model = scaler.fit(df)
    df = scaler_model.transform(df)

    return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# fonction qui effectue une PCA 
def apply_pca(df, k=50):
    """
    Applique PCA sur les features.
    :param df: DataFrame Spark contenant la colonne de features
    :param k: Nombre de composantes principales à conserver (par défaut 50)
    :return: DataFrame Spark avec la colonne PCA ajoutée
    """

    # Appliquer PCA
    pca = PCA(k=k, inputCol="scaled_features", outputCol="pca_features")
    pca_model = pca.fit(df)
    pca_df = pca_model.transform(df)

    return pca_df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# preprocessing des images avant PCA
preprocessed_df = preprocess_data_for_pca(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# effectuer une PCA en utilisant le nombre optimal de composantes calculé en local (soit 93)
pca_features_df = apply_pca(preprocessed_df, k=93)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Sauvegarde des features réduites dans le bucket S3 (au format parquet)
pca_features_df.write.mode("overwrite").parquet(PATH_Result + "pca")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.6 Chargement des données enregistrées et validation du résultat

In [None]:
# On charge le dataframe contenant les features sans la réduction en composantes principales
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# On l'affiche
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                           features
0  s3://p9-compartiment/Test_aws/Pineapple Mini/r...  ...  [0.009593535, 4.3991313, 0.04475324, 0.0, 0.00...
1  s3://p9-compartiment/Test_aws/Cantaloupe 2/r_3...  ...  [0.13054568, 0.0, 0.0, 0.0, 0.0, 0.045383025, ...
2  s3://p9-compartiment/Test_aws/Apple Golden 1/1...  ...  [0.06073756, 0.115641624, 0.044025578, 0.0, 0....
3  s3://p9-compartiment/Test_aws/Nectarine/146_10...  ...  [0.007164955, 0.0, 0.06656204, 0.0, 0.0, 0.021...
4  s3://p9-compartiment/Test_aws/Huckleberry/r_19...  ...  [0.8012797, 0.19527003, 0.7847723, 0.0, 2.0496...

[5 rows x 3 columns]

In [None]:
# on vérifie le nombre de dimensions d'une feature image, doit être = 1280
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [None]:
# On affiche les dimensions du df
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(655, 3)

In [None]:
# On charge le dataframe contenant les features réduites
pca_df = spark.read.parquet(PATH_Result + "pca", engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# récupérer la première ligne du DataFrame
first_row = pca_df.select("pca_features").first()

# afficher le nombre de dimensions de la feature
print(f"Nombre de dimensions de la PCA feature: {first_row['pca_features'].size}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Nombre de dimensions de la PCA feature: 93

In [None]:
# Afficher le dataframe pca_df
pca_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------+--------------------+--------------------+--------------------+--------------------+
|                path|       label|            features|     features_vector|     scaled_features|        pca_features|
+--------------------+------------+--------------------+--------------------+--------------------+--------------------+
|s3://p9-compartim...|Cantaloupe 2|[0.0611227, 0.0, ...|[0.06112270057201...|[-0.6717448045512...|[-0.6193973845935...|
|s3://p9-compartim...|   Raspberry|[0.23333512, 0.14...|[0.23333512246608...|[-0.3364481174011...|[-1.0283540414419...|
|s3://p9-compartim...|      Lychee|[0.54699963, 2.29...|[0.54699963331222...|[0.27425504911013...|[-5.6419615837667...|
|s3://p9-compartim...|   Mangostan|[0.10335773, 0.0,...|[0.10335773229598...|[-0.5895134180869...|[3.97554452378693...|
|s3://p9-compartim...|Pear Forelle|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[-0.7907503837427...|[6.26506430434513...|
+--------------------+------------+-----

In [None]:
# afficher le type des colonnes de pca_features_df
pca_features_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- features_vector: vector (nullable = true)
 |-- scaled_features: vector (nullable = true)
 |-- pca_features: vector (nullable = true)

In [None]:
# Ne conserver que les colonnes utiles
new_pca_df = pca_features_df.select("path", "label", "pca_features")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# pca_features est un vecteur dense qu'il nous faut reconvertir en liste
vector_to_list_udf = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))
new_pca_df = new_pca_df.withColumn("pca_features", vector_to_list_udf("pca_features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
new_pca_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- pca_features: array (nullable = true)
 |    |-- element: float (containsNull = true)

In [None]:
# convertir new_pca_df en dataframe pandas
pca_results = new_pca_df.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# afficher pca_results
pca_results.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                       pca_features
0  s3://p9-compartiment/Test_aws/Pineapple Mini/r...  ...  [-9.855330467224121, 5.186450004577637, -3.607...
1  s3://p9-compartiment/Test_aws/Cantaloupe 2/r_3...  ...  [-8.517338752746582, 7.775743007659912, -0.934...
2  s3://p9-compartiment/Test_aws/Apple Golden 1/1...  ...  [1.1631702184677124, 14.654661178588867, -2.92...
3  s3://p9-compartiment/Test_aws/Nectarine/146_10...  ...  [8.980578422546387, 3.789280414581299, -0.7577...
4  s3://p9-compartiment/Test_aws/Huckleberry/r_19...  ...  [-7.43386697769165, -9.371136665344238, -7.191...

[5 rows x 3 columns]

In [None]:
# On le sauvegarde au format csv
pca_results.to_csv('s3://p9-compartiment/Results_csv/pca_features.csv', index=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.7 Résiliation du cluster EMR<br>
<p>Nous pouvons désormais résilier le cluster pour éviter les coûts supplémentaires liés à des machines inactives qui continuent de tourner. En effet, même si notre travail est terminé, la facturation continue tant que l'on a pas résilié le culster. Avant toute chose, n'oublions pas de désactiver le tunnel shh dans FoxyProxy pour éviter les problèmes de timeout. La résiliation prend 1 minute.<br><br>Note, il est possible de cloner le cluster pour obtenir une copie à l'identique et réexécuter le notebook dans les mêmes conditions.</p>