# Projet P9 : Réalisez un traitement dans un environnement Big Data sur le Cloud

## Contexte

Vous êtes Data Scientist dans une très jeune start-up de l'AgriTech, nommée "Fruits!", qui cherche à proposer des solutions innovantes pour la récolte des fruits.

La volonté de l’entreprise est de préserver la biodiversité des fruits en permettant des traitements spécifiques pour chaque espèce de fruits en développant des robots cueilleurs intelligents.

Votre start-up souhaite dans un premier temps se faire connaître en mettant à disposition du grand public une application mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.
Pour la start-up, cette application permettrait de sensibiliser le grand public à la biodiversité des fruits et de mettre en place une première version du moteur de classification des images de fruits.
De plus, le développement de l’application mobile permettra de construire une première version de l'architecture Big Data nécessaire.

<b> MISSION : </b>

Votre collègue Paul vous indique l’existence d’un document, formalisé par un alternant qui vient de quitter l’entreprise. Il a testé une première approche dans un environnement Big Data AWS EMR, à partir d’un jeu de données constitué des images de fruits et des labels associés. Le notebook réalisé par l’alternant servira de point de départ pour construire une partie de la chaîne de traitement des données.

Vous êtes donc chargé de vous approprier les travaux réalisés par l’alternant et de compléter la chaîne de traitement. L’important est de mettre en place les premières briques de traitement qui serviront lorsqu’il faudra passer à l’échelle en termes de volume de données !

## Etapes du projet :

<h4> Etape 1 : Environnement de travail et librairies </h4>

- Démarrage de la session Spark
- Importation des librairies
- Définition des PATH

<h4> Etape 2 : Traitement des données et modélisation </h4>

- Chargement des données
- Préparation du modèle
- Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF
- Exécutions des actions d'extractions de features
- Chargement des données enregistrées et validation du résultat
- Réduction de dimension PCA

## ETAPE 1 : Environnement de travail et librairies

### 1.1. Démarrage de la session Spark

Avant de commencer, il faut s'assurer d'utiliser le kernel pyspark.

En utilisant ce kernel, une session spark est créé à l'exécution de la première cellule.
Il n'est donc plus nécessaire d'exécuter le code "spark = (SparkSession ..." comme lors
de l'exécution de notre notebook en local sur notre VM Ubuntu.

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1729672982490_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1729672982490_0001,pyspark,idle,Link,Link,✔


### 1.2. Import des librairies

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

In [3]:
import pandas as pd
import numpy as np
import io
import os

import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.ml.linalg import Vectors, DenseVector
from pyspark.sql import Row
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.ml.feature import PCA

import boto3 # boto3 = bibliothèque AWS pour interagir avec S3

import matplotlib.pyplot as plt

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 1.3. Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [4]:
PATH = 's3://ct-p9-calculsdistribues'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://ct-p9-calculsdistribues
PATH_Data:   s3://ct-p9-calculsdistribues/Test
PATH_Result: s3://ct-p9-calculsdistribues/Results

## ETAPE 2 : Traitement des données et modélisation

### 2.1. Chargement des données

In [5]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://ct-p9-calcul...|2024-10-21 08:34:59|  6257|[FF D8 FF E0 00 1...|
|s3://ct-p9-calcul...|2024-10-21 08:34:59|  6253|[FF D8 FF E0 00 1...|
|s3://ct-p9-calcul...|2024-10-21 08:34:59|  6250|[FF D8 FF E0 00 1...|
|s3://ct-p9-calcul...|2024-10-21 08:34:59|  6223|[FF D8 FF E0 00 1...|
|s3://ct-p9-calcul...|2024-10-21 08:34:59|  6215|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [7]:
images.select(split(images['path'], '/').alias('split_path')).show(1, truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------------------------------------------+
|split_path                                                        |
+------------------------------------------------------------------+
|[s3:, , ct-p9-calculsdistribues, Test, Onion White, r_179_100.jpg]|
+------------------------------------------------------------------+
only showing top 1 row

In [8]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+-----------------------------------------------------------+-----------+
|path                                                       |label      |
+-----------------------------------------------------------+-----------+
|s3://ct-p9-calculsdistribues/Test/Onion White/r_179_100.jpg|Onion White|
|s3://ct-p9-calculsdistribues/Test/Onion White/r_183_100.jpg|Onion White|
|s3://ct-p9-calculsdistribues/Test/Onion White/r_180_100.jpg|Onion White|
|s3://ct-p9-calculsdistribues/Test/Onion White/r_178_100.jpg|Onion White|
|s3://ct-p9-calculsdistribues/Test/Onion White/r_189_100.jpg|Onion White|
+-----------------------------------------------------------+-----------+
only showing top 5 rows

None

### 2.2. Préparation du modèle

In [9]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [10]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [13]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 2.3. Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [14]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



### 2.4. Exécutions des actions d'extractions de features

In [15]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://ct-p9-calculsdistribues/Results

In [18]:
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 2.5. Chargement des données enregistrées et validation du résultat

In [19]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                           features
0  s3://ct-p9-calculsdistribues/Test/Onion White/...  ...  [0.0, 0.4194712, 0.0, 0.0, 0.0, 0.0, 0.3876464...
1  s3://ct-p9-calculsdistribues/Test/Onion White/...  ...  [0.16061136, 0.106646694, 0.0, 0.0, 0.07023956...
2  s3://ct-p9-calculsdistribues/Test/Onion White/...  ...  [0.037584778, 0.19835307, 0.0, 0.0, 0.82172316...
3  s3://ct-p9-calculsdistribues/Test/Granadilla/7...  ...  [1.5850376, 0.06203457, 0.0, 0.29234514, 0.016...
4  s3://ct-p9-calculsdistribues/Test/Granadilla/r...  ...  [1.0744823, 0.0, 0.0, 0.0, 0.004926945, 0.1679...

[5 rows x 3 columns]

In [21]:
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [22]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(400, 3)

In [23]:
df.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Index(['path', 'label', 'features'], dtype='object')

### 2.6. Réduction de dimension PCA

In [24]:
# Fonction pour convertir les listes en vecteurs denses
def list_to_vector(row):
    return Row(path=row['path'], label=row['label'], features=Vectors.dense(row['features']))

# Appliquer la conversion à chaque ligne du DataFrame pandas
df_converted = df.apply(list_to_vector, axis=1)

# Créer un DataFrame PySpark à partir du DataFrame pandas converti
df_spark = spark.createDataFrame(df_converted)
df_spark.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: vector (nullable = true)

In [25]:
# Définir le nombre de composantes principales
pca = PCA(k=4, inputCol="features", outputCol="pcaFeatures")

# Ajuster le modèle PCA sur le DataFrame
pca_model = pca.fit(df_spark)

# Appliquer la transformation PCA
df_pca = pca_model.transform(df_spark)

# Afficher les nouvelles caractéristiques après PCA
df_pca.select("pcaFeatures").show(truncate=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------------------------------------------------------------+
|pcaFeatures                                                                    |
+-------------------------------------------------------------------------------+
|[-2.832261277007301,-3.759914961760966,-17.572010500106057,6.636464007536399]  |
|[-4.560008600470341,-3.297084917180519,-5.825523115751458,1.9513899399127508]  |
|[-3.5718733659963484,-3.359986850408344,-7.316718621509712,3.06038691988881]   |
|[-9.264961988727297,3.0112557249768876,3.718332144704285,13.974251972972745]   |
|[-7.383848461180175,0.9578962800437563,3.6736921402690395,8.349619625313737]   |
|[-7.570820840962923,-3.199461608668071,2.2024531686203312,-5.284022511090555]  |
|[-7.863220299313799,-3.1039138572908365,2.2672645796905164,-5.463285528055552] |
|[0.31413462447099566,5.392654109777226,-1.852373239670365,5.967291755324481]   |
|[1.390945191373669,6.51136221041643,-0.5550846410359221,3.872546877403518]     |
|[2.868096414066

In [26]:
df_pca.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: vector (nullable = true)
 |-- pcaFeatures: vector (nullable = true)

In [27]:
# Définir une fonction pour convertir les vecteurs sparse en dense
def sparse_to_dense(v):
    return v.toArray().tolist()

# UDF pour effectuer la conversion
sparse_to_dense_udf = udf(sparse_to_dense, ArrayType(DoubleType()))

# Appliquer la conversion sur la colonne 'pcaFeatures'
df_pca_dense = df_pca.withColumn("densePcaFeatures", sparse_to_dense_udf("pcaFeatures"))

# Extraire les deux premières composantes principales (PC1 et PC2) et les convertir en pandas
df_pca_pd = df_pca_dense.select(
    col("densePcaFeatures")[0].alias("PC1"),  # Première composante
    col("densePcaFeatures")[1].alias("PC2"),  # Deuxième composante
    col("label")  # Optionnel si tu veux colorer les points par catégorie
).toPandas()

# Affichage des 5 premières lignes pour vérification
print(df_pca_pd.head())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

        PC1       PC2        label
0 -2.832261 -3.759915  Onion White
1 -4.560009 -3.297085  Onion White
2 -3.571873 -3.359987  Onion White
3 -9.264962  3.011256   Granadilla
4 -7.383848  0.957896   Granadilla

In [28]:
import matplotlib.pyplot as plt

# Si les labels sont des chaînes, les convertir en valeurs numériques
label_mapping = {label: idx for idx, label in enumerate(df_pca_pd["label"].unique())}
df_pca_pd['label_numeric'] = df_pca_pd['label'].map(label_mapping)

# Optionnel : Si tu veux utiliser une palette de couleurs spécifique pour chaque label
colors = plt.cm.get_cmap('viridis', len(df_pca_pd["label"].unique()))

# Création du graphique en nuage de points avec Matplotlib
plt.figure(figsize=(10, 6))

# Utilisation des couleurs mappées en fonction des labels
scatter = plt.scatter(df_pca_pd["PC1"], df_pca_pd["PC2"], c=df_pca_pd["label_numeric"], cmap=colors, alpha=0.6)

# Ajouter des labels et un titre
plt.xlabel('Première composante principale (PC1)')
plt.ylabel('Deuxième composante principale (PC2)')
plt.title('Projection des points sur les deux premiers axes PCA')

# Créer une légende personnalisée avec les labels
handles, _ = scatter.legend_elements(prop="colors")
legend_labels = list(label_mapping.keys())  # Utiliser les noms originaux des labels
plt.legend(handles, legend_labels, loc="lower right", title="Types de fruits")

# Personnalisation de l'affichage
plt.xticks(fontsize=8)
plt.yticks(fontsize=8)
plt.grid(color='gray', linestyle='-.', linewidth=0.2)

# Sauvegarder le graphique dans /tmp/ et téléverser sur S3
plt.savefig("/tmp/pca_plot.png")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
# Créer une session S3
s3 = boto3.client('s3')

# Définir le bucket S3 et le chemin du fichier de destination
bucket_name = 'ct-p9-calculsdistribues'
destination_path = 'ACP/pca_plot.png'

# Téléverser le fichier
s3.upload_file('/tmp/pca_plot.png', bucket_name, destination_path)

print(f"Fichier téléversé sur s3://{bucket_name}/{destination_path}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Fichier téléversé sur s3://ct-p9-calculsdistribues/ACP/pca_plot.png