<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [18]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1717482012453_0002,pyspark,idle,Link,Link,,✔


### 1.0 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

### 1.1 Import des librairies

In [37]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession
from pyspark.ml.feature import StandardScaler

from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType
import pandas as pd

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 1.2 Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

Le bucket S3 doit contenir une bucket policy permettant aux instances de récupérer les images et sauvegarder les résultats.
Voici un exemple de policy que vous pouvez utiliser en dev. Attention toutefois à s'assurer d'utiliser des permissions plus restrictives en production. (pas de `s3:*`)

```json
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Effect": "Allow",
            "Principal": {
                "AWS": [
                    "arn:aws:iam::267341338450:role/service-role/AmazonEMR-ServiceRole-20240603T210707",
                    "arn:aws:iam::267341338450:role/service-role/AmazonEMR-InstanceProfile-20240603T210651"
                ]
            },
            "Action": "s3:*",
            "Resource": [
                "arn:aws:s3:::267341338450-fruits-oc-data",
                "arn:aws:s3:::267341338450-fruits-oc-data/*"
            ]
        }
    ]
}
```

In [20]:
PATH = 's3://267341338450-fruits-oc-data'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://267341338450-fruits-oc-data
PATH_Data:   s3://267341338450-fruits-oc-data/Test
PATH_Result: s3://267341338450-fruits-oc-data/Results

### 1.3 Traitement des données
#### 1.3.1 Chargement des données

In [21]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://267341338450...|2024-06-03 18:29:34|  7353|[FF D8 FF E0 00 1...|
|s3://267341338450...|2024-06-03 18:29:34|  7350|[FF D8 FF E0 00 1...|
|s3://267341338450...|2024-06-03 18:29:34|  7349|[FF D8 FF E0 00 1...|
|s3://267341338450...|2024-06-03 18:29:34|  7348|[FF D8 FF E0 00 1...|
|s3://267341338450...|2024-06-03 18:29:34|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [23]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+--------------------------------------------------------------+----------+
|path                                                          |label     |
+--------------------------------------------------------------+----------+
|s3://267341338450-fruits-oc-data/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://267341338450-fruits-oc-data/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://267341338450-fruits-oc-data/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://267341338450-fruits-oc-data/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://267341338450-fruits-oc-data/Test/Watermelon/r_95_100.jpg |Watermelon|
+--------------------------------------------------------------+----------+
only showing top 5 rows

None

#### 1.3.2 Préparation du modèle

In [24]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On effectue une diffusion des poids distribués du modèle pour permettre aux instances d'effectuer le traitement sur les mêmes paramètres.

In [26]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 112, 112, 32)         864       ['input_2[0][0]']             
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 112, 112, 32)         128       ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 112, 112, 32)         0         ['bn_Conv1[0][0]']      

#### 1.3.3 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [28]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.3.4 Exécutions des actions d'extractions de features

In [31]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.3.5 Normalisation des Features & Réduction de dimension

In [38]:
# Convertir les fonctionnalités en vecteurs de Spark MLlib
array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
features_df = features_df.withColumn("features_vector", array_to_vector_udf(col("features")))

# Standardize the data using StandardScaler
scaler = StandardScaler(inputCol="features_vector", outputCol="features_scaled", withStd=True, withMean=True)
scaler_model = scaler.fit(features_df)
features_df = scaler_model.transform(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
num_features = len(features_df.select("features").first()[0])
print(f"Number of features: {num_features}")

rows_count = features_df.count()
print(f"Shape of features_df: ({rows_count}, {num_features + 1})")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of features: 1280
Shape of features_df: (22688, 1281)

On va faire une réduction PCA pour réduire le nombre de features de 1280 avec une variance expliquée de 95 %

In [40]:
pca = PCA(k=num_features, inputCol="features_scaled", outputCol="pca_features")
pca_model = pca.fit(features_df)

# Calculer la variance expliquée cumulée
explained_variance_ratio = pca_model.explainedVariance.cumsum()  # Variance expliquée cumulée
components = np.arange(len(explained_variance_ratio)) + 1

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
num_components_95_variance = np.where(explained_variance_ratio >= 0.95)[0][0] + 1
print(f"Number of components for 95% variance: {num_components_95_variance}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Number of components for 95% variance: 344

On arrive à réduire le nombre de features à 344

In [42]:
pca_95 = PCA(k=num_components_95_variance, inputCol="features_scaled", outputCol="pca_features_95")
pca_model_95 = pca_95.fit(features_df)
features_df_pca_95 = pca_model_95.transform(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.4 Résultat

In [43]:
features_df_pca_95.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|                path|         label|            features|     features_vector|     features_scaled|     pca_features_95|
+--------------------+--------------+--------------------+--------------------+--------------------+--------------------+
|s3://267341338450...|Pineapple Mini|[0.0, 4.72348, 0....|[0.0,4.7234802246...|[-0.7574008190603...|[-5.9249880828433...|
|s3://267341338450...|Pineapple Mini|[0.007994136, 4.5...|[0.00799413584172...|[-0.7425873117677...|[-6.0254744062722...|
|s3://267341338450...|Pineapple Mini|[0.0, 4.6597733, ...|[0.0,4.6597733497...|[-0.7574008190603...|[-4.5681854504048...|
|s3://267341338450...|    Watermelon|[0.05392126, 0.10...|[0.05392125993967...|[-0.6574822045716...|[-3.2593249712840...|
|s3://267341338450...|    Watermelon|[0.11927667, 0.00...|[0.11927667260169...|[-0.5363755707534...|[-3.4228038112477...|
+--------------------+--

On récupère que le chemin du fichier, son label et ses features pour exporter ces données dans un fichier CSV.

In [45]:
final_result = features_df_pca_95.select('path','label','pca_features_95')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [47]:
final_result.toPandas().to_csv('s3://267341338450-fruits-oc-data/final_result.csv')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On sauvegarde quand même tous les résultats au format .parquet si on a besoin de les réutiliser.

In [44]:
features_df_pca_95.write.mode("overwrite").parquet(PATH_Result + "_pca_95")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…