# Projet 9 : Réalisez un traitement dans un environnement Big Data sur le Cloud

##  Contexte

- **Start-up:** "Fruits!"
- **Objectif:** Application mobile qui permettrait prendre en photo un fruit et d'obtenir des informations 
- **Notebook alternant:** P8_Notebook_Linux_EMR_PySpark_V1.0.ipynb
    - Première approche dans un environnement AWS EMR
    - Servira de point de départ pour la suite
- **Données:** Images de fruits + labels associés


## Missions :

- Compléter la chaîne de traitement de l'alternant
- Ne pas entrainer de modèle (utiliser un modèle préentrainé?)
- Le volume de données va augmenter très rapidement après la livraison de ce projet
    - Le script doit prendre compte de ca
    - Choisir entre AWS (EMR, S3, IAM) et Databricks pour l'instance
- Expliquer pas à pas le script PySpark complet
- Diffuser les poids du modèle Tensorflow sur les clusters (broadcast des “weights” du modèle)
- Créer une étape de réduction de dimension de type PCA en PySpark 
- Respecter les contraintes du RGPD -> installation afin d’utiliser des serveurs situés sur le territoire européen 

## Démarrage de la session Spark

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1732936087553_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1732936087553_0001,pyspark,idle,Link,Link,,
1,application_1732936087553_0002,pyspark,idle,Link,Link,,✔


## Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

In [None]:
# Import des librairies

In [3]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [4]:
PATH = 's3://p9-data-sg'
PATH_Data = PATH+'/data_test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://p9-data-sg
PATH_Data:   s3://p9-data-sg/data_test
PATH_Result: s3://p9-data-sg/Results

## Traitement des données

###  Chargement des données

In [5]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p9-data-sg/d...|2024-11-29 13:23:00|  5492|[FF D8 FF E0 00 1...|
|s3://p9-data-sg/d...|2024-11-29 13:23:03|  5482|[FF D8 FF E0 00 1...|
|s3://p9-data-sg/d...|2024-11-29 13:22:59|  5479|[FF D8 FF E0 00 1...|
|s3://p9-data-sg/d...|2024-11-29 13:23:02|  5467|[FF D8 FF E0 00 1...|
|s3://p9-data-sg/d...|2024-11-29 13:23:00|  5454|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [7]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+----------------------------------------------------+-------------+
|path                                                |label        |
+----------------------------------------------------+-------------+
|s3://p9-data-sg/data_test/orange_sample/92_100.jpg  |orange_sample|
|s3://p9-data-sg/data_test/orange_sample/r_46_100.jpg|orange_sample|
|s3://p9-data-sg/data_test/orange_sample/83_100.jpg  |orange_sample|
|s3://p9-data-sg/data_test/orange_sample/r_37_100.jpg|orange_sample|
|s3://p9-data-sg/data_test/orange_sample/93_100.jpg  |orange_sample|
+----------------------------------------------------+-------------+
only showing top 5 rows

None

### Préparation du modèle

In [8]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

In [12]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

In [13]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



### Exécutions des actions d'extractions de features

In [14]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://p9-data-sg/Results

In [17]:
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Chargement des données enregistrées et validation du résultat

In [18]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                           features
0  s3://p9-data-sg/data_test/orange_sample/r_36_1...  ...  [0.09886958, 0.0, 0.0, 0.0, 0.0, 0.11915819, 0...
1  s3://p9-data-sg/data_test/banana_sample/21_100...  ...  [1.3663095, 0.0051844656, 0.0, 0.0046235686, 0...
2  s3://p9-data-sg/data_test/orange_sample/r_4_10...  ...  [0.6908158, 0.0, 0.0, 0.0, 0.0, 0.055544835, 0...
3  s3://p9-data-sg/data_test/banana_sample/165_10...  ...  [0.8605738, 0.16744861, 0.0, 0.04298717, 0.592...
4  s3://p9-data-sg/data_test/orange_sample/93_100...  ...  [0.026866553, 0.0022831254, 0.0, 0.0, 0.0, 0.0...

[5 rows x 3 columns]

In [20]:
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [21]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(20, 3)

On peut également constater la présence des fichiers au format "parquet" sur le serveur S3.

##  Etape de réduction de dimension avec PCA

In [27]:
# Définir les chemins de sauvegarde
PATH = 's3://p9-data-sg'
PATH_Data = PATH + '/data-test'
PATH_Result_PCA = PATH + '/Results_PCA'  # Définir correctement le chemin ici

# Vérification des chemins
print('PATH:        ' + PATH)
print('PATH_Data:   ' + PATH_Data)
print('PATH_Results_PCA: ' + PATH_Result_PCA)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://p9-data-sg
PATH_Data:   s3://p9-data-sg/data-test
PATH_Results_PCA: s3://p9-data-sg/Results_PCA

In [28]:
# Import des librairies
from pyspark.ml.feature import VectorAssembler, PCA
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

# Conversion des features en vecteurs
df['features'] = df['features'].apply(lambda x: Vectors.dense(x))  # Conversion en vecteurs
df_spark = spark.createDataFrame(df)  # Conversion en DataFrame PySpark

# Création du VectorAssembler pour combiner les colonnes en une seule colonne de vecteurs
vecAssembler = VectorAssembler(inputCols=["features"], outputCol="features_vec")
df_spark = vecAssembler.transform(df_spark)

# Obtenir le nombre total de caractéristiques
total_features = len(df_spark.select("features").first()[0])

# Appliquer PCA de manière itérative pour atteindre 90% de la variance expliquée
explained_variance = 0.0
k = 0
while explained_variance < 0.90 and k < total_features:
    k += 1
    pca = PCA(k=k, inputCol="features_vec", outputCol="pcaFeatures")
    model = pca.fit(df_spark)  # Entraînement du modèle PCA
    explained_variance = sum(model.explainedVariance)  # Calcul de la variance expliquée

# Transformation des données avec le modèle PCA
result = model.transform(df_spark)

# Suppression de la colonne 'features_vec' après transformation
result = result.drop('features_vec')

# UDF pour convertir les vecteurs en liste
vector_to_list = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))
result = result.withColumn("features", vector_to_list(result["features"]))
result = result.withColumn("pcaFeatures", vector_to_list(result["pcaFeatures"]))

# Affichage des résultats
result.show()

# Sauvegarde en format Parquet
result.write.mode('overwrite').parquet(PATH_Result_PCA)

# Charger et afficher les résultats dans Pandas
df_parquet = pd.read_parquet(PATH_Result_PCA, engine='pyarrow')
print(df_parquet.head())


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------+--------------------+--------------------+
|                path|        label|            features|         pcaFeatures|
+--------------------+-------------+--------------------+--------------------+
|s3://p9-data-sg/d...|orange_sample|[0.09886958, 0.0,...|[9.506975, -0.876...|
|s3://p9-data-sg/d...|banana_sample|[1.3663095, 0.005...|[-7.7182097, 13.6...|
|s3://p9-data-sg/d...|orange_sample|[0.6908158, 0.0, ...|[10.241321, 0.466...|
|s3://p9-data-sg/d...|banana_sample|[0.8605738, 0.167...|[-6.806244, 12.09...|
|s3://p9-data-sg/d...|orange_sample|[0.026866553, 0.0...|[11.237363, 0.942...|
|s3://p9-data-sg/d...|orange_sample|[0.8680287, 0.0, ...|[11.004317, 0.734...|
|s3://p9-data-sg/d...|orange_sample|[0.84602123, 0.0,...|[10.886948, 0.328...|
|s3://p9-data-sg/d...|orange_sample|[0.85569197, 0.00...|[9.487195, 0.7097...|
|s3://p9-data-sg/d...|orange_sample|[0.013146974, 0.0...|[11.076643, 0.844...|
|s3://p9-data-sg/d...|banana_sample|[0.9155784, 0.02