## Démarrage de la session Spark

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1725826868157_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Affichage des informations sur la session en cours et liens vers Spark UI

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1725826868157_0002,pyspark,idle,Link,Link,,✔


## Import des librairies

In [3]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA, StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Définition des PATH pour charger les images et enregistrer les résultats

In [4]:
PATH = 's3://nadat-projet9'
PATH_Data = PATH+'/Images'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://nadat-projet9
PATH_Data:   s3://nadat-projet9/Images
PATH_Result: s3://nadat-projet9/Results

## Traitement des données
### Chargement des données

In [5]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://nadat-projet...|2024-09-08 19:54:06|  4893|[FF D8 FF E0 00 1...|
|s3://nadat-projet...|2024-09-08 19:54:06|  4875|[FF D8 FF E0 00 1...|
|s3://nadat-projet...|2024-09-08 19:54:05|  4841|[FF D8 FF E0 00 1...|
|s3://nadat-projet...|2024-09-08 19:54:05|  4830|[FF D8 FF E0 00 1...|
|s3://nadat-projet...|2024-09-08 19:54:04|  4371|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

In [7]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+----------------------------------------------+-------+
|path                                          |label  |
+----------------------------------------------+-------+
|s3://nadat-projet9/Images/Avocado/r_4_100.jpg |Avocado|
|s3://nadat-projet9/Images/Avocado/r_5_100.jpg |Avocado|
|s3://nadat-projet9/Images/Avocado/r_41_100.jpg|Avocado|
|s3://nadat-projet9/Images/Avocado/r_40_100.jpg|Avocado|
|s3://nadat-projet9/Images/Avocado/87_100.jpg  |Avocado|
+----------------------------------------------+-------+
only showing top 5 rows

None

### Préparation du modèle

In [8]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

In [12]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Featurisation

In [13]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [14]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
features_df.dtypes

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('path', 'string'), ('label', 'string'), ('features', 'array<float>')]

In [16]:
features_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+--------------------+
|                path|  label|            features|
+--------------------+-------+--------------------+
|s3://nadat-projet...|Apricot|[0.38278008, 0.37...|
|s3://nadat-projet...|Apricot|[0.2248509, 0.380...|
|s3://nadat-projet...|Apricot|[0.28019404, 0.18...|
|s3://nadat-projet...|Apricot|[0.27679187, 0.51...|
|s3://nadat-projet...|Apricot|[0.5861873, 0.444...|
|s3://nadat-projet...|Avocado|[0.6303439, 0.0, ...|
|s3://nadat-projet...|Apricot|[0.24216104, 0.60...|
|s3://nadat-projet...|Avocado|[1.370346, 0.0, 0...|
|s3://nadat-projet...|Apricot|[0.22149958, 0.30...|
|s3://nadat-projet...|Avocado|[0.27027053, 0.0,...|
|s3://nadat-projet...|Apricot|[0.12042856, 0.24...|
|s3://nadat-projet...|Avocado|[0.36889747, 0.0,...|
|s3://nadat-projet...|Apricot|[0.18892747, 0.25...|
|s3://nadat-projet...|Avocado|[0.87467134, 0.0,...|
|s3://nadat-projet...|Apricot|[0.3087985, 0.253...|
|s3://nadat-projet...|Avocado|[0.8457122, 0.0, ...|
|s3://nadat-

## Transformation de l'array des features en Vector

In [17]:
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
df_with_vectors = features_df.select(features_df["path"],
                                     features_df["label"],
                                     list_to_vector_udf(features_df["features"]).alias("features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
df_with_vectors.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+--------------------+
|                path|  label|            features|
+--------------------+-------+--------------------+
|s3://nadat-projet...|Apricot|[0.38278007507324...|
|s3://nadat-projet...|Apricot|[0.22485089302062...|
|s3://nadat-projet...|Apricot|[0.28019404411315...|
|s3://nadat-projet...|Apricot|[0.27679187059402...|
|s3://nadat-projet...|Apricot|[0.58618730306625...|
|s3://nadat-projet...|Avocado|[0.63034391403198...|
|s3://nadat-projet...|Apricot|[0.24216103553771...|
|s3://nadat-projet...|Avocado|[1.37034595012664...|
|s3://nadat-projet...|Apricot|[0.22149957716464...|
|s3://nadat-projet...|Avocado|[0.27027052640914...|
|s3://nadat-projet...|Apricot|[0.12042856216430...|
|s3://nadat-projet...|Avocado|[0.36889746785163...|
|s3://nadat-projet...|Apricot|[0.18892747163772...|
|s3://nadat-projet...|Avocado|[0.87467133998870...|
|s3://nadat-projet...|Apricot|[0.30879849195480...|
|s3://nadat-projet...|Avocado|[0.84571218490600...|
|s3://nadat-

## Standardisation

In [20]:
scaler = StandardScaler(
    inputCol = 'features',
    outputCol = 'scaledFeatures',
    withMean = True,
    withStd = True
).fit(df_with_vectors)

df_scaled = scaler.transform(df_with_vectors)
df_scaled.show(6)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+--------------------+--------------------+
|                path|  label|            features|      scaledFeatures|
+--------------------+-------+--------------------+--------------------+
|s3://nadat-projet...|Apricot|[0.38278007507324...|[-0.1937736697186...|
|s3://nadat-projet...|Apricot|[0.22485089302062...|[-0.6844710784107...|
|s3://nadat-projet...|Apricot|[0.28019404411315...|[-0.5125158990258...|
|s3://nadat-projet...|Apricot|[0.27679187059402...|[-0.5230866985960...|
|s3://nadat-projet...|Apricot|[0.58618730306625...|[0.43822732204339...|
|s3://nadat-projet...|Avocado|[0.63034391403198...|[0.57542511092506...|
+--------------------+-------+--------------------+--------------------+
only showing top 6 rows

## Réduction de dimension

In [21]:
pca = PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures")
model = pca.fit(df_scaled)
result = model.transform(df_scaled)
print('Explained Variance Ratio', model.explainedVariance.toArray())
result.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Explained Variance Ratio [0.3472032  0.17626964]
+--------------------+-------+--------------------+--------------------+--------------------+
|                path|  label|            features|      scaledFeatures|         pcaFeatures|
+--------------------+-------+--------------------+--------------------+--------------------+
|s3://nadat-projet...|Apricot|[0.38278007507324...|[-0.1937736697186...|[20.9019851532750...|
|s3://nadat-projet...|Apricot|[0.22485089302062...|[-0.6844710784107...|[20.7926573018974...|
|s3://nadat-projet...|Apricot|[0.28019404411315...|[-0.5125158990258...|[15.8381511471682...|
|s3://nadat-projet...|Apricot|[0.27679187059402...|[-0.5230866985960...|[21.0348574841457...|
|s3://nadat-projet...|Apricot|[0.58618730306625...|[0.43822732204339...|[19.9283528930738...|
|s3://nadat-projet...|Avocado|[0.63034391403198...|[0.57542511092506...|[-13.775099223625...|
|s3://nadat-projet...|Apricot|[0.24216103553771...|[-0.6306872117001...|[20.7914339422817...|
|s3://nadat

### Rappel du PATH où seront inscrits les fichiers au format "parquet" :

In [22]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://nadat-projet9/Results

### Enregistrement des données traitées au format "parquet" :

In [23]:
result.write.mode("overwrite").parquet(PATH_Result+'/parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Enregistrement des données traitées au format "csv" :

In [24]:
pca_df = result.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
pca_df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                              path  ...                                 pcaFeatures
0     s3://nadat-projet9/Images/Apricot/62_100.jpg  ...    [20.901985153275024, 3.2467039021505544]
1     s3://nadat-projet9/Images/Apricot/61_100.jpg  ...      [20.79265730189749, 4.425531573184797]
2     s3://nadat-projet9/Images/Apricot/48_100.jpg  ...     [15.838151147168277, 1.795469023683046]
3     s3://nadat-projet9/Images/Apricot/58_100.jpg  ...     [21.034857484145757, 5.083685931038015]
4     s3://nadat-projet9/Images/Apricot/59_100.jpg  ...      [19.92835289307381, 6.157120090036898]
5     s3://nadat-projet9/Images/Avocado/88_100.jpg  ...   [-13.77509922362561, -21.155478645868875]
6     s3://nadat-projet9/Images/Apricot/60_100.jpg  ...     [20.791433942281753, 5.207470024107931]
7   s3://nadat-projet9/Images/Avocado/r_40_100.jpg  ...   [-26.436693221823496, 13.606508566259542]
8     s3://nadat-projet9/Images/Apricot/46_100.jpg  ...     [17.70982987502727, 1.6674815812458121]


In [26]:
pca_df.to_csv(PATH_Result+'/'+'result.csv')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Chargement des données enregistrées et validation du résultat

In [27]:
dfs = spark.read.parquet(PATH_Result+'/parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
df = dfs.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## On affiche les 5 premières lignes du DataFrame

In [29]:
df.head(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                             path  ...                                pcaFeatures
0   s3://nadat-projet9/Images/Avocado/r_5_100.jpg  ...   [-30.11179355144153, 25.415948272872853]
1  s3://nadat-projet9/Images/Avocado/r_41_100.jpg  ...   [-23.33054559831994, 11.092566134115879]
2  s3://nadat-projet9/Images/Avocado/r_40_100.jpg  ...  [-26.436693221823496, 13.606508566259542]
3    s3://nadat-projet9/Images/Apricot/46_100.jpg  ...    [17.70982987502727, 1.6674815812458121]
4    s3://nadat-projet9/Images/Avocado/59_100.jpg  ...  [-13.454819112693453, -18.77913367783397]

[5 rows x 5 columns]

### On vérifie la taille du vecteur de caractéristiques des images après réduction à 2 dimensions

In [30]:
df.loc[0,'pcaFeatures'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(2,)