# Déployez un modèle dans le cloud(Script Pyspark)

#

#

# Sommaire

#### **Execution du code en PySpark** <br/>
&emsp;1.1 Démarrage de la session Spark <br/>
&emsp;1.2 Installation des packages <br/>
&emsp;1.3 Import des librairies <br/>
&emsp;**2.  Définition des PATH pour charger les images et enregistrer les résultats**<br/>
&emsp;**3.Traitement des données**<br/>
&emsp;&emsp;3.1 Chargement des données<br/>
&emsp;&emsp;3.2 Préparation du modèle<br/>
&emsp;&emsp;3.3 Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF<br/>
&emsp;&emsp;3.4 Exécutions des actions d'extractions de features<br/>
&emsp;&emsp;3.5 Chargement des données enregistrées et validation du résultat<br/>
&emsp;**4. Réduction de Dimension**<br/>
&emsp;&emsp;4.1 Convertir array(float) en Vector<br/>
&emsp;&emsp;4.2 Appliquer PCA Spark<br/>
&emsp;&emsp;4.3 Convertir en Pandas DataFrame<br/>
&emsp;&emsp;4.4 Ecriture dans le dossier reduction de dimension<br/>


##

## 1.1 Démarrage de la session Spark

In [1]:
#  Démarrage de l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7,application_1747386696377_0008,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
7,application_1747386696377_0008,pyspark,idle,Link,Link,✔


#

## 1.2 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

In [3]:
#import tree
#print(tree.__file__)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 1.3 Import des librairies

In [4]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 2.  Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [5]:
PATH = 's3://buckets-fruits'
PATH_Data = PATH+'/test1'
PATH_Result = PATH+'/Results'
PATH_Reduction_dimension = PATH+'/Reductions_dimensions'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+\
      PATH_Result+'\nPATH_Result_dimension: '+PATH_Reduction_dimension)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://buckets-fruits
PATH_Data:   s3://buckets-fruits/test1
PATH_Result: s3://buckets-fruits/Results
PATH_Result_dimension: s3://buckets-fruits/Reductions_dimensions

### 3. Traitement des données

#### 3.1  Chargement des données

In [6]:
images = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.jpg") \
    .option("recursiveFileLookup", "true") \
    .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://buckets-frui...|2025-05-07 15:19:37| 10412|[FF D8 FF E0 00 1...|
|s3://buckets-frui...|2025-05-07 15:19:37| 10224|[FF D8 FF E0 00 1...|
|s3://buckets-frui...|2025-05-07 15:19:38|  9931|[FF D8 FF E0 00 1...|
|s3://buckets-frui...|2025-05-07 15:19:37|  9873|[FF D8 FF E0 00 1...|
|s3://buckets-frui...|2025-05-07 15:19:37|  9813|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

In [8]:
from pyspark.sql.functions import split, element_at

images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+------------------------------------------------------+-------------+
|path                                                  |label        |
+------------------------------------------------------+-------------+
|s3://buckets-fruits/test1/Blackberrie 2/r0_203_100.jpg|Blackberrie 2|
|s3://buckets-fruits/test1/Blackberrie 2/r0_207_100.jpg|Blackberrie 2|
|s3://buckets-fruits/test1/Blackberrie 2/r0_199_100.jpg|Blackberrie 2|
|s3://buckets-fruits/test1/Blackberrie 2/r0_195_100.jpg|Blackberrie 2|
|s3://buckets-fruits/test1/Blackberrie 2/r0_191_100.jpg|Blackberrie 2|
+------------------------------------------------------+-------------+
only showing top 5 rows

None

#### 3.2  Préparation du modèle

In [9]:
#%pip install --user dm-tree

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
#from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2

model = MobileNetV2(
    weights='imagenet',
    include_top=True,
    input_shape=(224, 224, 3)
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [14]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 3.3  Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

In [15]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



#### 3.4 Exécutions des actions d'extractions de features

In [16]:
#spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
features_df = images.repartition(24).select(
    col("path"),
    col("label"),
    featurize_udf("content").alias("features")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://buckets-fruits/Results

In [19]:
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
features_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)

#

## 4- Réduction de Dimension

In [21]:
from pyspark.ml.feature import PCA as SparkPCA
from pyspark.ml import Pipeline
from pyspark.ml.functions import vector_to_array
from pyspark.sql.functions import col
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, FloatType

from pyspark.ml.feature import SQLTransformer

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 4.1  Convertir array(float) en Vector

In [22]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT

def array_to_vector(array):
    return Vectors.dense(array)

array_to_vector_udf = udf(array_to_vector, VectorUDT())

df_vector = features_df.withColumn("features_vector", array_to_vector_udf(col("features")))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 4.2 Appliquer PCA Spark

In [23]:
pca = SparkPCA(k=2, inputCol="features_vector", outputCol="pca_features")
pca_model = pca.fit(df_vector)
df_pca = pca_model.transform(df_vector)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# Afficher les features PCA
df_pca.select("pca_features").show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------------------------+
|pca_features                            |
+----------------------------------------+
|[-4.153705700132245,3.7068501442642146] |
|[-1.3944086338490738,2.3849686479588876]|
|[7.484142196654506,-0.5519109713388295] |
|[6.275990758331985,-2.970526260640793]  |
|[1.6761432069824573,-6.075926321502753] |
|[-4.556448536868136,1.2401386417773466] |
|[3.401410855022288,-5.812559854635569]  |
|[-1.5583745376318747,8.200283896891001] |
|[-4.183779426921179,4.794082091013332]  |
|[-2.5444473384018655,8.115793272085293] |
|[4.794053987835644,-3.9058588792562463] |
|[3.9217100179536226,-4.8264278027401435]|
|[3.877919112752556,-6.879753979085583]  |
|[-5.919059992271315,6.257364498619317]  |
|[-3.5821213252721575,6.956241395561469] |
|[-0.5643366998122693,4.077889963325522] |
|[-3.145070826309212,3.09201743044584]   |
|[3.3586610848093703,-6.821362374031204] |
|[-6.183148649147887,2.3361680990104667] |
|[-2.3552177691861003,1.4168034954023314]|
+----------

#### 4.3 Convertir en Pandas DataFrame

In [25]:
df_pca.select("pca_features", "label")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[pca_features: vector, label: string]

In [28]:
dplot = df_pca.select("pca_features", "label").toPandas()
dplot.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                pca_features                     label
0    [-1.618076612985385, 9.423807793702021]             Blackberrie 2
1  [-2.0402846306696887, 3.0430530564627625]        Cactus fruit red 1
2   [3.0383686463886637, -5.495064844416145]                  Apple 13
3   [1.857876194081494, -6.1054805930805784]                  Apple 13
4  [-0.28232656237803655, 6.214937602035394]  Blackberrie not rippen 1

In [26]:
print(PATH_Reduction_dimension)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://buckets-fruits/Reductions_dimensions

In [27]:
df_pca.select("pca_features", "label").write.mode("overwrite").parquet(PATH_Reduction_dimension)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#

### V- Conclusion

 Dans le cadre de ce projet, notre objectif principal était d’anticiper une future augmentation de la charge de travail tout en assurant la scalabilité et la fiabilité de notre environnement de calcul.<br />

Pour y parvenir, nous avons fait le choix d’utiliser les services **d’Amazon Web Services (AWS)**, qui nous permettent de louer à la demande de la puissance de calcul, via le service EC2 (Elastic Compute Cloud), une offre classée dans la catégorie Infrastructure as a Service (IaaS). Cette approche nous offre une flexibilité optimale à un coût maîtrisé.<br />

Nous sommes allés plus loin en optant pour un service de niveau supérieur, **Amazon EMR (Elastic MapReduce)**, qui relève du modèle Platform as a Service (PaaS). EMR nous a permis d’instancier automatiquement un cluster de plusieurs machines, avec à la clé l’installation et la configuration de toutes les librairies nécessaires à notre projet, telles que **Spark, Hadoop, JupyterHub ainsi que TensorFlow**.<br />

Cette solution s’est révélée rapide, fiable et efficace à mettre en œuvre, grâce à l’infrastructure validée et maintenue par les ingénieurs AWS. L’installation des packages supplémentaires sur l’ensemble des nœuds du cluster s’est faite sans difficulté.<br />

Nous avons ainsi pu exécuter nos notebooks à l’identique de nos tests locaux, tout en bénéficiant cette fois de la puissance de traitement distribuée sur l’ensemble des images de notre dossier Test.<br />

Pour le **stockage**, nous avons utilisé **Amazon S3**, un service de stockage objet offrant une haute disponibilité, une grande capacité extensible et des coûts proportionnels à l’usage réel. Cela nous permet de gérer nos données efficacement, sans contrainte de volume.<br />

Enfin, la solution mise en place nous garantit une évolutivité immédiate : en cas de montée en charge, nous pourrons redimensionner horizontalement ou verticalement le cluster en fonction des besoins. Cette capacité d’adaptation rapide permet de conserver un excellent rapport performance/coût, sans les lourdeurs d’un investissement en matériel physique ou en serveurs dédiés.<br />