# P9 – Déploiement d’un pipeline de featurisation d’images sur AWS EMR (PySpark)

**Sommaire** :

**1. Préambule**<br />
&emsp;1.1 Problématique<br />
&emsp;1.2 Objectifs dans ce projet<br />
&emsp;1.3 Déroulement des étapes du projet<br />
**2. Choix techniques généraux retenus**<br />
&emsp;2.1 Calcul distribué<br />
&emsp;2.2 Transfert Learning<br />
**3. Déploiement de la solution en local**<br />
&emsp;3.1 Environnement de travail<br />
&emsp;3.2 Installation de Spark<br />
&emsp;3.3 Installation des packages<br />
&emsp;3.4 Import des librairies<br />
&emsp;3.5 Définition des PATH pour charger les images et enregistrer les résultats<br />
&emsp;3.6 Création de la SparkSession<br />
&emsp;3.7 Traitement des données<br />
&emsp;&emsp;3.7.1 Chargement des données<br />
&emsp;&emsp;3.7.2 Préparation du modèle<br />
&emsp;&emsp;3.7.3 Définition du processus de chargement des images et application <br />
&emsp;&emsp;&emsp;&emsp;&emsp;de leur featurisation à travers l'utilisation de pandas UDF<br />
&emsp;&emsp;3.7.4 Exécution des actions d'extractions de features<br />
&emsp;3.8 Chargement des données enregistrées et validation du résultat<br />
&emsp;3.9 Réduction de dimension par PCA en PySpark<br />

# 1. Préambule

## 1.1. Problématique

La très jeune start-up de l'AgriTech, nommée "**Fruits**!", cherche à proposer des solutions innovantes pour la récolte des fruits. La volonté de l’entreprise est de préserver la biodiversité des fruits en permettant des traitements spécifiques pour chaque espèce de fruits en développant des robots cueilleurs intelligents. La start-up souhaite dans un premier temps se faire connaître en mettant à disposition du grand public une application mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit. Pour la start-up, cette application permettrait de sensibiliser le grand public à la biodiversité des fruits et de mettre en place une première version du moteur de classification des images de fruits.
De plus, le développement de l’application mobile permettra de construire une première version de l'architecture **Big Data** nécessaire.

## 1.2. Objectifs dans ce projet

1. Développer une première chaîne de traitement des données qui comprendra le **preprocessing** et une étape de **réduction de dimension**.
2. Tenir compte du fait que <u>le volume de données va augmenter très rapidement</u> après la livraison de ce projet, ce qui implique de:
 - Déployer le traitement des données dans un environnement **Big Data**
 - Développer les scripts en **pyspark** pour effectuer du **calcul distribué**

## 1.3. Déroulement des étapes du projet

Le projet va être réalisé en 2 temps, dans deux environnements différents. Nous allons dans un premier temps développer et exécuter notre code en local, en travaillant sur un nombre limité d'images à traiter.

Une fois les choix techniques validés, nous déploierons notre solution dans un environnement Big Data en mode distribué.

<u>Pour cette raison, ce projet sera divisé en 3 parties</u>:
1. Liste des choix techniques généraux retenus
2. Déploiement de la solution en local
3. Déploiement de la solution dans le cloud

# 2. Choix techniques généraux retenus

## 2.1 Calcul distribué

L’énoncé du projet nous impose de développer des scripts en **pyspark** afin de <u>prendre en compte l’augmentation très rapide du volume de donné après la livraison du projet</u>.

Pour comprendre rapidement et simplement ce qu’est **pyspark** et son principe de fonctionnement, nous vous conseillons de lire cet article : [PySpark : Tout savoir sur la librairie Python](https://datascientest.com/pyspark)

<u>Le début de l’article nous dit ceci </u>:« *Lorsque l’on parle de traitement de bases de données sur python, on pense immédiatement à la librairie pandas. Cependant, lorsqu’on a affaire à des bases de données trop massives, les calculs deviennent trop lents. Heureusement, il existe une autre librairie python, assez proche de pandas, qui permet de traiter des très grandes quantités de données : PySpark.Apache Spark est un framework open-source développé par l’AMPLab de UC Berkeley permettant de traiter des bases de données massives en utilisant le calcul distribué, technique qui consiste à exploiter plusieurs unités de calcul réparties en clusters au profit d’un seul projet afin de diviser le temps d’exécution d’une requête.Spark a été développé en Scala et est au meilleur de ses capacités dans son langage natif. Cependant, la librairie PySpark propose de l’utiliser avec le langage Python, en gardant des performances similaires à des implémentations en Scala.Pyspark est donc une bonne alternative à la librairie pandas lorsqu’on cherche à traiter des jeux de données trop volumineux qui entraînent des calculs trop chronophages.* »

Comme nous le constatons, **pySpark** est un moyen de communiquer avec **Spark** via le langage **Python**.**Spark**, quant à lui, est un outil qui permet de gérer et de coordonner l'exécution de tâches sur des données à travers un groupe d'ordinateurs. <u>Spark (ou Apache Spark) est un framework open source de calcul distribué in-memory pour le traitement et l'analyse de données massives</u>.

Un autre [article très intéressant et beaucoup plus complet pour comprendre le **fonctionnement de Spark**](https://www.veonum.com/apache-spark-pour-les-nuls/), ainsi que le rôle des **Spark Session** que nous utiliserons dans ce projet.

<u>Voici également un extrait</u>:

*Les applications Spark se composent d’un pilote (« driver process ») et de plusieurs exécuteurs (« executor processes »). Il peut être configuré pour être lui-même l’exécuteur (local mode) ou en utiliser autant que nécessaire pour traiter l’application, Spark prenant en charge la mise à l’échelle automatique par une configuration d’un nombre minimum et maximum d’exécuteurs.*

![Schéma de Spark](img/spark-schema.png)

*Le driver (parfois appelé « Spark Session ») distribue et planifie les tâches entre les différents exécuteurs qui les exécutent et permettent un traitement réparti. Il est le responsable de l’exécution du code sur les différentes machines.

Chaque exécuteur est un processus Java Virtual Machine (JVM) distinct dont il est possible de configurer le nombre de CPU et la quantité de mémoire qui lui est alloué. Une seule tâche peut traiter un fractionnement de données à la fois.*

Dans les deux environnements (Local et Cloud) nous utiliserons donc **Spark** et nous l’exploiterons à travers des scripts python grâce à **PySpark**.

Dans la <u>version locale</u> de notre script nous **simulerons le calcul distribué** afin de valider que notre solution fonctionne.Dans la <u>version cloud</u> nous **réaliserons les opérations sur un cluster de machine**.

## 2.2 Transfert Learning

L'énoncé du projet nous demande également de réaliser une première chaîne de traitement des données qui comprendra le preprocessing et une étape de réduction de dimension. Il est également précisé qu'il n'est pas nécessaire d'entraîner un modèle pour le moment.

Nous décidons de partir sur une solution de **transfert learning**. Simplement, le **transfert learning** consiste à utiliser la connaissance déjà acquise par un modèle entraîné (ici **MobileNetV2**) pour l'adapter à notre problématique. Nous allons fournir au modèle nos images, et nous allons <u>récupérer l'avant dernière couche</u> du modèle.En effet la dernière couche de modèle est une couche softmax qui permet la classification des images ce que nous ne souhaitons pas dans ce projet. L'avant dernière couche correspond à un **vecteur réduit** de dimension (1,1,1280). Cela permettra de réaliser une première version du moteur pour la classification des images des fruits.

**MobileNetV2** a été retenu pour sa <u>rapidité d'exécution</u>, particulièrement adaptée pour le traitement d'un gros volume de données ainsi que la <u>faible dimensionnalité du vecteur de caractéristique en sortie</u> (1,1,1280)

# 3. Déploiement de la solution en local

## 3.1 Environnement de travail

Pour des raisons de simplicité, nous développons dans un environnement Linux Unbuntu (exécuté depuis une machine Windows dans une machine virtuelle)
* Pour installer une machine virtuelle :  https://www.malekal.com/meilleurs-logiciels-de-machine-virtuelle-gratuits-ou-payants/

## 3.2 Installation de Spark

[La première étape consiste à installer Spark ](https://computingforgeeks.com/how-to-install-apache-spark-on-ubuntu-debian/)

## 3.3 Installation des packages

<u>On installe ensuite à l'aide de la commande **pip** <br />
les packages qui nous seront nécessaires</u> :

In [None]:
!pip install Pandas pillow tensorflow pyspark pyarrow

## 3.4 Import des librairies

In [None]:
import os
import io
import shutil
import numpy as np
import pandas as pd
import random

from PIL import Image

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.models import Model

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, element_at

## 3.5 Définition des PATH pour charger les images et enregistrer les résultats

Dans cette version locale nous partons du principe que les données sont stockées dans le même répertoire que le notebook. Nous n'utilisons qu'un extrait de **300 images** à traiter dans cette première version en local.L'extrait des images à charger est stockée dans le dossier **Test1**. Nous enregistrerons le résultat de notre traitement dans le dossier "**Results_Local**"

In [None]:
BASE_PATH = os.getcwd()

PATH_DATA = os.path.join(BASE_PATH, "data", "Test1")
PATH_RESULT = os.path.join(BASE_PATH, "data", "Results_local")

os.makedirs(PATH_RESULT, exist_ok=True)

print(f"PATH_DATA   : {PATH_DATA}")
print(f"PATH_RESULT : {PATH_RESULT}")

## 3.6 Création de la SparkSession

L’application Spark est contrôlée grâce à un processus de pilotage (driver process) appelé **SparkSession**. Une instance de **SparkSession** est la façon dont Spark exécute les fonctions définies par l’utilisateur dans l’ensemble du cluster. Une SparkSession correspond toujours à une application Spark.

Ici nous créons une session spark en spécifiant dans l'ordre :
  1. un **nom pour l'application**, qui sera affichée dans l'interface utilisateur Web Spark "**P8**"
  2. que l'application doit s'exécuter **localement**. Nous ne définissons pas le nombre de cœurs à utiliser (comme .master('local[4]) pour 4 cœurs à utiliser), nous utiliserons donc tous les cœurs disponibles dans notre processeur.
  3. une option de configuration supplémentaire permettant d'utiliser le **format "parquet"** que nous utiliserons pour enregistrer et charger le résultat de notre travail.
  4. vouloir **obtenir une session spark** existante ou si aucune n'existe, en créer une nouvelle

Nous exécutons Spark en mode local avec l’ensemble des cœurs CPU afin de simuler un calcul distribué tout en restant dans un environnement de développement.

In [None]:
spark = (
    SparkSession.builder                                        
    .appName("P9_Fruits_Image_Featurization")                   # définition du nom de l'application spark (apparait dans le spark UI, les logs, cluster manager si cloud)
                                                                # Le nom de l’application permet d’identifier clairement le job Spark dans l’interface de monitoring
    .master("local[*]")                                         # local = execution sur une seule machine ; [*] = utilise tous les coeurs CPU dispo
    .config("spark.sql.parquet.writeLegacyFormat", "true")      # utilisation d'un format Parquet compatible avec d'anciennes implémentations
    .getOrCreate()                                              # si il existe une session il l'utilise sinon il en crée une nouvelle
)

- 127.0.1.1 = adresse loopback (la machine qui parle à elle-même)
- Spark préfère une vraie adresse réseau, même en local
- 10.0.2.15 = IP interne de ta VM (VirtualBox / réseau NAT)

Il y a deux niveaux dans Spark :
- Niveau moderne (celui qu'on utilise)
    - SparkSession
    - DataFrames
    - Spark SQL
- Niveau bas niveau (historique)
    - SparkContext
    - RDD
    - gestion directe des partitions

<u>Nous créons également la variable "**sc**" qui est un **SparkContext** issue de la variable **spark**</u> :

In [None]:
# À partir de la SparkSession que j’ai déjà créée, je récupère le moteur interne Spark.
sc = spark.sparkContext

<u>Affichage des informations de Spark en cours d'execution</u> :

In [None]:
spark

La SparkSession est correctement initialisée en mode local, avec un SparkContext actif. L’interface Spark UI est accessible, ce qui confirme que l’application Spark est bien lancée et prête à exécuter des transformations distribuées.

## 3.7 Traitement des données




Dans la suite de notre flux de travail, nous allons successivement:
1. Préparer nos données
    1. Importer les images dans un dataframe **pandas UDF**
    2. Associer aux images leur **label**
    3. Préprocesser en **redimensionnant nos images pour  qu'elles soient compatibles avec notre modèle**
2. Préparer notre modèle
    1. Importer le modèle **MobileNetV2**
    2. Créer un **nouveau modèle** dépourvu de la dernière couche de MobileNetV2
3. Définir le processus de chargement des images et l'application de leur featurisation à travers l'utilisation de pandas UDF
3. Exécuter les actions d'extraction de features
4. Enregistrer le résultat de nos actions
5. Tester le bon fonctionnement en chargeant les données enregistrées

### 3.7.0 Création du jeu de données

Pour ce projet, nous utilisons la version Fruits-360 en 100×100 pixels, qui est stable, bien documentée et adaptée à un pipeline de traitement distribué.
La version “original size” étant explicitement indiquée comme non finalisée par les auteurs, nous ne l’utilisons pas afin de rester dans un cadre maîtrisé et conforme aux objectifs du projet.

In [None]:
# Dossier source : version 100x100 stable du dataset
SOURCE_DIR = os.path.join(
    BASE_PATH,
    "data/fruits-360_dataset/fruits-360/Training"
)

TARGET_DIR = PATH_DATA

In [None]:
NB_CLASSES = 30          # nombre de classes à garder
NB_IMAGES_PER_CLASS = 10 # images par classe (~300 images au total)


In [None]:
os.makedirs(TARGET_DIR, exist_ok=True)

In [None]:
# Récupération de toutes les classes disponibles
all_classes = sorted([
    d for d in os.listdir(SOURCE_DIR)
    if os.path.isdir(os.path.join(SOURCE_DIR, d))
])

# Sélection des classes
selected_classes = random.sample(all_classes, NB_CLASSES)

print(f"Classes sélectionnées ({len(selected_classes)}) : {selected_classes}")

Copie des images

In [None]:
total_copied = 0

for cls in selected_classes:
    src_dir = os.path.join(SOURCE_DIR, cls)
    tgt_dir = os.path.join(TARGET_DIR, cls)

    os.makedirs(tgt_dir, exist_ok=True)

    # Liste complète des images de la classe
    images = [
        f for f in os.listdir(src_dir)
        if f.lower().endswith(".jpg")
    ]

    # Sélection aléatoire des images de la classe
    selected_images = random.sample(images, NB_IMAGES_PER_CLASS)

    for img in selected_images:
        shutil.copy(
            os.path.join(src_dir, img),
            os.path.join(tgt_dir, img)
        )

    total_copied += len(selected_images)
    print(f"{cls} : {len(selected_images)} images copiées")

print(f"\nTotal images copiées : {total_copied}")

### 3.7.1 Chargement des données

Les images sont chargées au format binaire, ce qui offre, plus de souplesse dans la façon de prétraiter les images. Avant de charger les images, nous spécifions que nous voulons charger uniquement les fichiers dont l'extension est **jpg**. Nous indiquons également de charger tous les objets possibles contenus dans les sous-dossiers du dossier communiqué.

In [None]:
images = (
    spark.read.format("binaryFile")         # spark lit des fichiers binaires
    .option("pathGlobFilter", "*.jpg")      # ne charge que les fichiers .jpg
    .option("recursiveFileLookup", "true")  # parcourt tous les sous-dossiers récursivement (ne lit pas que le dossier racine)
    .load(PATH_DATA)                        # Spark scanne le dossier, crée un df distribué avec une ligne par image
)

Chaque ligne contient : 
| Colonne            | Contenu                    |
| ------------------ | -------------------------- |
| `path`             | chemin complet du fichier  |
| `modificationTime` | date de dernière modif     |
| `length`           | taille en octets           |
| `content`          | image en binaire (`bytes`) |


<u>Je ne conserve que le **path** de l'image et j'ajoute une colonne contenant les **labels** de chaque image</u> :

In [None]:
# Extraction du label depuis le chemin
images = images.withColumn(
    "label",
    element_at(split(col("path"), "/"), -2)
)

In [None]:
# Schéma du DataFrame
images.printSchema()

In [None]:
# Aperçu des données
images.select("path", "label").show(10, truncate=False)

### 3.7.2 Préparation du modèle

Je vais utiliser la technique du **transfert learning** pour extraire les features des images. J'ai choisi d'utiliser le modèle **MobileNetV2** pour sa rapidité d'exécution comparée à d'autres modèles comme *VGG16* par exemple.

Pour en savoir plus sur la conception et le fonctionnement de MobileNetV2, je vous invite à lire [cet article](https://towardsdatascience.com/review-mobilenetv2-light-weight-model-image-classification-8febb490e61c).

<u>Voici le schéma de son architecture globale</u> : 

![Architecture de MobileNetV2](img/mobilenetv2_architecture.png)

Il existe une dernière couche qui sert à classer les images selon 1000 catégories que nous ne voulons pas utiliser. L'idée dans ce projet est de récupérer le **vecteur de caractéristiques de dimensions (1,1,1280)** qui servira, plus tard, au travers d'un moteur de classification à reconnaitre les différents fruits du jeu de données.

Comme d'autres modèles similaires, **MobileNetV2**, lorsqu'on l'utilise en incluant toutes ses couches, attend obligatoirement des images de dimension (224,224,3). Nos images étant toutes de dimension (100,100,3), nous devrons simplement les **redimensionner** avant de les confier au modèle.

<u>Dans l'odre</u> :
 1. Nous chargeons le modèle **MobileNetV2** avec les poids **précalculés** issus d'**imagenet** et en spécifiant le format de nos images en entrée
 2. Nous créons un nouveau modèle avec:
  - <u>en entrée</u> : l'entrée du modèle MobileNetV2
  - <u>en sortie</u> : l'avant dernière couche du modèle MobileNetV2

In [None]:
# Chargement du modèle pré-entraîné SANS la tête de classification
base_model = MobileNetV2(           # Construction de l'architecture MobileNetV2
    weights="imagenet",             # Initialisation du modèle avec des poids déjà appris sur ImageNet
    include_top=False,              # Exclusion de la dernière partie du modèle qui correspond à la classification qu'on ne veut pas 
    input_shape=(224, 224, 3),      # Toutes les images envoyées au modèle auront cette forme (contrainte technique du modèle)
    pooling="avg"                   # Sortie (1280,): on supprime les dimensions spatiales, chaque image devient un vecteur de 1280 nombres
)

base_model.trainable = False        # On gèle les poids du modèle (pas d'entrainement, pas de mise a jour)

In [None]:
# Vérification de la sortie 
base_model.summary()

**Résumé** : 

- Entrée du modèle: 
    - `input_layer (InputLayer)` | `Output Shape: (None, 224, 224, 3)`
    - Le modèle attend bien des images 224 x 224 avec 3 canaux de couleur / None = taille du batch inconnue à l'avance.

- Corps du modèle : 
    - Longue liste de couches comme `Conv2D`, `DepthwiseConv2D`, `BatchNormalization`, `ReLU`... 
    - Ce bloc correspond à l'extraction progressive de caractéristiques visuelles. 
    - Il y a beaucoup de couches parce que MobileNetV2 utilise des bloc inversés résiduels, des convolutions depthwise, des skip connections -> ce qui sert à réduire le nombre de paramètres, garder de bonnes performances et être rapide. 

- Sortie : 
    - `global_average_pooling (GlobalAveragePooling2D)` | `Output Shape: (None, 1280)`
    - Chaque image devient un vecteur de 1280 valeurs, sans information spatiale. 

**Les paramètres** :

- ~2.2M de paramètres dont aucun entrainement (prouve bien que le transfert learning est figé)

Tous les workeurs doivent pouvoir accéder au modèle ainsi qu'à ses poids. Une bonne pratique consiste à charger le modèle sur le driver puis à diffuser ensuite les poids aux différents workeurs.

In [None]:
# On récupère tous les paramètres numériques du modèle sous forme de listes de tableaux numpy
# Ces données doivent être envoyées à tous les workers de manière efficace
broadcast_weights = sc.broadcast(base_model.get_weights())

<u>Mettons cela sous forme de fonction</u> :

In [None]:
def model_fn():
    """
    Creates a MobileNetV2 model (feature extractor)
    with broadcasted ImageNet weights.
    """
    model = MobileNetV2(
        weights=None,
        include_top=False,
        input_shape=(224, 224, 3),
        pooling="avg"
    )
    
    model.set_weights(broadcast_weights.value)  # Injection des poids broadcastés -> chaque worker a son propre modèle avec les bons poids
    model.trainable = False     # Le modèle ne sera jamais entrainé 
    
    return model

Le modèle est chargé une seule fois sur le driver, puis ses poids sont diffusés aux workers Spark à l’aide d’un broadcast. Chaque worker reconstruit localement le modèle avec ces poids, ce qui évite des rechargements coûteux et permet une exécution distribuée efficace.

### 3.7.3 Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

Le but de cette section est de prendre des images stockées sous forme binaire dans un DataFrame Spark et les transformer en vecteurs de caractéristiques,en parallèle, sur plusieurs workers.

MobileNetV2 est un modèle Python / TensorFlow et Spark est distribué. Il faut donc un pont entre Spark, Pandas et TensorFlow => Pandas UDF. 

<u>L'empilement des appels est la suivante</u> : 
Pandas UDF 
- featuriser une série d'images pd.Series 
- prétraiter une image

Spark :
- découpe les données
- envoie des batches à chaque worker
- chaque batch est traité via Pandas + TensorFlow

In [None]:
# Import nécessaires
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, FloatType
from typing import Iterator

In [None]:
# Préprocessing image
def preprocess(content):
    """
    Preprocess raw image bytes for MobileNetV2 inference.
    """
    img = Image.open(io.BytesIO(content)).convert("RGB")
    img = img.resize((224, 224))
    arr = img_to_array(img)     # convertit une image PIL en tableau numpy (224, 224, 3)
    arr = preprocess_input(arr) # applique la normalisation spécifique MobileNetV2
    return arr

#Featurisation d'un batch Pandas
def featurize_series(model, content_series):
    """
    Featurize a pandas Series of image bytes using the provided model.
    Returns a pandas Series of feature vectors.
    """
    batch = np.stack(content_series.map(preprocess))
    features = model.predict(batch, verbose=0)

    return pd.Series(features.tolist())

# Pandas UDF (Spark – Scalar Iterator)
@pandas_udf(ArrayType(FloatType()))
def featurize_udf(
    content_series_iter: Iterator[pd.Series]
) -> Iterator[pd.Series]:
    """
    Scalar Iterator Pandas UDF for distributed image featurization.
    """
    model = model_fn()  # chargé UNE fois par worker

    for content_series in content_series_iter:
        yield featurize_series(model, content_series)   # pour chaque batch : on applique MobileNetV2, on renvoie les features et Spark récupère le résultat

### 3.7.4 Exécution des actions d'extraction de features

Les Pandas UDF, sur de grands enregistrements (par exemple, de très grandes images), peuvent rencontrer des erreurs de type Out Of Memory (OOM). Si vous rencontrez de telles erreurs dans la cellule ci-dessous, essayez de réduire la taille du lot Arrow via 'maxRecordsPerBatch'

Je n'utiliserai pas cette commande dans ce projet et je laisse donc la commande en commentaire.

In [None]:
# À activer uniquement en cas d'erreurs OOM sur de très gros volumes
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

Nous pouvons maintenant exécuter la featurisation sur l'ensemble de notre DataFrame Spark.

<u>REMARQUE</u> : Cela peut prendre beaucoup de temps, tout dépend du volume de données à traiter. 

Notre jeu de données de **Test** contient **22819 images**. 
Cependant, dans l'exécution en mode **local**, 
nous <u>traiterons un ensemble réduit de **300 images**</u>.

In [None]:
features_df = (
    images              # DataFrame Spark initial (contient path, content, label)
    .repartition(20)    # nombre de partitions explicite (local / cluster)
    .select(
        col("path"),    # on conserve le chemin de l'image
        col("label"),   # on conserve le label de l'image
        featurize_udf(col("content")).alias("features")     # pour chaque image, on applique la Pandas UDF sur la colonne content
    )
)

Concrètement :

- Spark :
    - regroupe les lignes par batch
    - envoie les batches aux workers
- Chaque worker :
    - charge MobileNetV2 une seule fois
    - applique le preprocessing
    - extrait les features
- Le résultat :
    - une colonne features
    - contenant un vecteur de 1280 floats

In [None]:
# Vérification rapide du schéma
features_df.printSchema()

Rappel du PATH où seront inscrits les fichiers au format "**parquet**" contenant nos résultats, à savoir, un DataFrame contenant 3 colonnes:
 1. Path des images
 2. Label de l'image
 3. Vecteur de caractéristiques de l'image

In [None]:
print(f"Résultats enregistrés dans : {PATH_RESULT}")

Execution : 

In [None]:
(
    features_df
    .write
    .mode("overwrite")
    .parquet(PATH_RESULT)
)

## 3.8 Chargement des données enregistrées et validation du résultat


<u>On charge les données</u> :

In [None]:
features_df = spark.read.parquet(PATH_RESULT)

In [None]:
# Vérification du schéma 
features_df.printSchema()


In [None]:
# Vérification dimension du vecteur
from pyspark.sql.functions import size

features_df.select(size("features").alias("features_dim")).show(5)

<u>On affiche les 5 premières lignes du DataFrame</u> :

In [None]:
features_df.select("label", "features").show(3, truncate=False)

Nous venons de valider le processus sur un jeu de données allégé en local où nous avons simulé un cluster de machines en répartissant la charge de travail sur différents cœurs de processeur au sein d'une même machine.

Nous allons maintenant généraliser le processus en déployant notre solution sur un réel cluster de machines et nous travaillerons désormais sur la totalité des 22819 images de notre dossier "Test".

## 3.9 Réduction de dimension par PCA en PySpark

### 3.9.1 Vectorisation des features

In [None]:
from pyspark.ml.functions import array_to_vector

In [None]:
features_vec_df = features_df.withColumn(
    "features_vec",
    array_to_vector("features")
)

In [None]:
features_vec_df.select("features_vec").show(1, truncate=False)

### 3.9.2 Application de la PCA

In [None]:
from pyspark.ml.feature import PCA

In [None]:
pca_full = PCA(
    k=1280,   # dimension originale des embeddings MobileNetV2
    inputCol="features_vec",
    outputCol="features_pca_full"
)

pca_full_model = pca_full.fit(features_vec_df)

### 3.9.3 Analyse de la variance expliquée

In [None]:
explained_variance = pca_full_model.explainedVariance.toArray()

In [None]:
df_var = pd.DataFrame({
    "component": np.arange(1, len(explained_variance) + 1),
    "explained_variance": explained_variance,
})

df_var["cum_variance"] = df_var["explained_variance"].cumsum()

df_var.head(10)

In [None]:
k_80 = (df_var["cum_variance"] >= 0.80).idxmax() + 1
k_90 = (df_var["cum_variance"] >= 0.90).idxmax() + 1

print("k pour 80% de variance :", k_80)
print("k pour 90% de variance :", k_90)

In [None]:
import matplotlib.pyplot as plt

# On limite l'affichage aux 100 premières composantes pour la lisibilité
df_plot = df_var.iloc[:100]

plt.figure(figsize=(14, 7))

# Barres : variance expliquée par composante
plt.bar(df_plot["component"], df_plot["explained_variance"], alpha=0.7, label="Variance expliquée")

# Courbe : variance cumulée
plt.plot(df_plot["component"], df_plot["cum_variance"], 
         color="red", marker="o", linewidth=2, label="Variance cumulée")

# Lignes horizontales de seuil
plt.axhline(y=0.8, color="green", linestyle="--", label="Seuil 80%")
plt.axhline(y=0.9, color="purple", linestyle="--", label="Seuil 90%")

plt.xlabel("Composantes principales")
plt.ylabel("Variance expliquée")
plt.title("Scree plot – PCA sur les embeddings MobileNetV2")
plt.legend()
plt.grid(alpha=0.3)

plt.show()


L’analyse de la variance expliquée par la PCA montre que les premières composantes principales concentrent l’essentiel de l’information contenue dans les embeddings extraits par MobileNetV2.
En particulier, environ 80 % de la variance totale est expliquée par les ~50 premières composantes, et 90 % par les ~100 premières composantes.
Afin de réduire significativement la dimension tout en conservant l’essentiel de l’information discriminante, nous retenons un compromis de k = 50 composantes pour la suite du pipeline.

### 3.9.4 Application de la PCA finale avec un k justifié

In [None]:
pca = PCA(
    k=50,     
    inputCol="features_vec",
    outputCol="features_pca"
)

pca_model = pca.fit(features_vec_df)
features_pca_df = pca_model.transform(features_vec_df)

features_pca_df.select("features_pca").show(3, truncate=False)
