# Déployez un modèle dans le cloud


# Sommaire :

**1. Préambule**<br />
&emsp;1.1 Problématique<br />
&emsp;1.2 Objectifs dans ce projet<br />
&emsp;1.3 Déroulement des étapes du projet<br />
**2. Choix techniques généraux retenus**<br />
&emsp;2.1 Calcul distribué<br />
&emsp;2.2 Transfert Learning<br />
**3. Déploiement de la solution en local**<br />
&emsp;3.1 Environnement de travail<br />
&emsp;3.2 Installation de Spark<br />
&emsp;3.3 Installation des packages<br />
&emsp;3.4 Import des librairies<br />
&emsp;3.5 Définition des PATH pour charger les images et enregistrer les résultats<br />
&emsp;3.6 Création de la SparkSession<br />
&emsp;3.7 Traitement des données<br />
&emsp;&emsp;3.7.1 Chargement des données<br />
&emsp;&emsp;3.7.2 Préparation du modèle<br />
&emsp;&emsp;3.7.3 Définition du processus de chargement des images et application <br />
&emsp;&emsp;&emsp;&emsp;&emsp;de leur featurisation à travers l'utilisation de pandas UDF<br />
&emsp;&emsp;3.7.4 Exécution des actions d'extractions de features<br />
&emsp;3.8 Chargement des données enregistrées et validation du résultat<br />
**4. Déploiement de la solution sur le cloud**<br />
&emsp;4.1 Choix du prestataire cloud : AWS<br />
&emsp;4.2 Choix de la solution technique : EMR<br />
&emsp;4.3 Choix de la solution de stockage des données : Amazon S3<br />
**5. Conclusion**

# 1. Préambule

## 1.1 Problématique

La très jeune start-up de l'AgriTech, nommée "**Fruits**!", <br />
cherche à proposer des solutions innovantes pour la récolte des fruits.

La volonté de l’entreprise est de préserver la biodiversité des fruits <br />
en permettant des traitements spécifiques pour chaque espèce de fruits <br />
en développant des robots cueilleurs intelligents.

La start-up souhaite dans un premier temps se faire connaître en mettant <br />
à disposition du grand public une application mobile qui permettrait aux <br />
utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

Pour la start-up, cette application permettrait de sensibiliser le grand public <br />
à la biodiversité des fruits et de mettre en place une première version du moteur <br />
de classification des images de fruits.

De plus, le développement de l’application mobile permettra de construire <br />
une première version de l'architecture **Big Data** nécessaire.

## 1.2 Objectifs dans ce projet

1. Développer une première chaîne de traitement des données qui <br />
   comprendra le **preprocessing** et une étape de **réduction de dimension**.
2. Tenir compte du fait que <u>le volume de données va augmenter <br />
   très rapidement</u> après la livraison de ce projet, ce qui implique de:
 - Déployer le traitement des données dans un environnement **Big Data**
 - Développer les scripts en **pyspark** pour effectuer du **calcul distribué**

## 1.3 Déroulement des étapes du projet

Le projet va être réalisé en 2 temps, dans deux environnements différents. <br />
Nous allons dans un premier temps développer et exécuter notre code en local, <br />
en travaillant sur un nombre limité d'images à traiter.

Une fois les choix techniques validés, nous déploierons notre solution <br />
dans un environnement Big Data en mode distribué.

<u>Pour cette raison, ce projet sera divisé en 3 parties</u>:
1. Liste des choix techniques généraux retenus
2. Déploiement de la solution en local
3. Déploiement de la solution dans le cloud

# 2. Choix techniques généraux retenus

## 2.1 Calcul distribué

L’énoncé du projet nous impose de développer des scripts en **pyspark** <br />
afin de <u>prendre en compte l’augmentation très rapide du volume <br />
de donné après la livraison du projet</u>.

Pour comprendre rapidement et simplement ce qu’est **pyspark** <br />
et son principe de fonctionnement, nous vous conseillons de lire <br />
cet article : [PySpark : Tout savoir sur la librairie Python](https://datascientest.com/pyspark)

<u>Le début de l’article nous dit ceci </u>:<br />
« *Lorsque l’on parle de traitement de bases de données sur python, <br />
on pense immédiatement à la librairie pandas. Cependant, lorsqu’on a <br />
affaire à des bases de données trop massives, les calculs deviennent trop lents.<br />
Heureusement, il existe une autre librairie python, assez proche <br />
de pandas, qui permet de traiter des très grandes quantités de données : PySpark.<br />
Apache Spark est un framework open-source développé par l’AMPLab <br />
de UC Berkeley permettant de traiter des bases de données massives <br />
en utilisant le calcul distribué, technique qui consiste à exploiter <br />
plusieurs unités de calcul réparties en clusters au profit d’un seul <br />
projet afin de diviser le temps d’exécution d’une requête.<br />
Spark a été développé en Scala et est au meilleur de ses capacités <br />
dans son langage natif. Cependant, la librairie PySpark propose de <br />
l’utiliser avec le langage Python, en gardant des performances <br />
similaires à des implémentations en Scala.<br />
Pyspark est donc une bonne alternative à la librairie pandas lorsqu’on <br />
cherche à traiter des jeux de données trop volumineux qui entraînent <br />
des calculs trop chronophages.* »

Comme nous le constatons, **pySpark** est un moyen de communiquer <br />
avec **Spark** via le langage **Python**.<br />
**Spark**, quant à lui, est un outil qui permet de gérer et de coordonner <br />
l'exécution de tâches sur des données à travers un groupe d'ordinateurs. <br />
<u>Spark (ou Apache Spark) est un framework open source de calcul distribué <br />
in-memory pour le traitement et l'analyse de données massives</u>.

Un autre [article très intéressant et beaucoup plus complet pour <br />
comprendre le **fonctionnement de Spark**](https://www.veonum.com/apache-spark-pour-les-nuls/), ainsi que le rôle <br />
des **Spark Session** que nous utiliserons dans ce projet.

<u>Voici également un extrait</u>:

*Les applications Spark se composent d’un pilote (« driver process ») <br />
et de plusieurs exécuteurs (« executor processes »). Il peut être configuré <br />
pour être lui-même l’exécuteur (local mode) ou en utiliser autant que <br />
nécessaire pour traiter l’application, Spark prenant en charge la mise <br />
à l’échelle automatique par une configuration d’un nombre minimum <br />
et maximum d’exécuteurs.*

![Schéma de Spark](img/spark-schema.png)

*Le driver (parfois appelé « Spark Session ») distribue et planifie <br />
les tâches entre les différents exécuteurs qui les exécutent et permettent <br />
un traitement réparti. Il est le responsable de l’exécution du code <br />
sur les différentes machines.

Chaque exécuteur est un processus Java Virtual Machine (JVM) distinct <br />
dont il est possible de configurer le nombre de CPU et la quantité de <br />
mémoire qui lui est alloué. <br />
Une seule tâche peut traiter un fractionnement de données à la fois.*

Dans les deux environnements (Local et Cloud) nous utiliserons donc **Spark** <br />
et nous l’exploiterons à travers des scripts python grâce à **PySpark**.

Dans la <u>version locale</u> de notre script nous **simulerons <br />
le calcul distribué** afin de valider que notre solution fonctionne.<br />
Dans la <u>version cloud</u> nous **réaliserons les opérations sur un cluster de machine**.

## 2.2 Transfert Learning

L'énoncé du projet nous demande également de <br />
réaliser une première chaîne de traitement <br />
des données qui comprendra le preprocessing et <br />
une étape de réduction de dimension.

Il est également précisé qu'il n'est pas nécessaire <br />
d'entraîner un modèle pour le moment.

Nous décidons de partir sur une solution de **transfert learning**.

Simplement, le **transfert learning** consiste <br />
à utiliser la connaissance déjà acquise <br />
par un modèle entraîné (ici **MobileNetV2**) pour <br />
l'adapter à notre problématique.

Nous allons fournir au modèle nos images, et nous allons <br />
<u>récupérer l'avant dernière couche</u> du modèle.<br />
En effet la dernière couche de modèle est une couche softmax <br />
qui permet la classification des images ce que nous ne <br />
souhaitons pas dans ce projet.

L'avant dernière couche correspond à un **vecteur <br />
réduit** de dimension (1,1,1280).

Cela permettra de réaliser une première version du moteur <br />
pour la classification des images des fruits.

**MobileNetV2** a été retenu pour sa <u>rapidité d'exécution</u>, <br />
particulièrement adaptée pour le traitement d'un gros volume <br />
de données ainsi que la <u>faible dimensionnalité du vecteur <br />
de caractéristique en sortie</u> (1,1,1280)

# 3. Déploiement de la solution en local


## 3.1 Environnement de travail

Pour des raisons de simplicité, nous développons dans un environnement <br />
Linux Unbuntu (exécuté depuis une machine Windows dans une machine virtuelle)
* Pour installer une machine virtuelle :  https://www.malekal.com/meilleurs-logiciels-de-machine-virtuelle-gratuits-ou-payants/

## 3.2 Installation de Spark

[La première étape consiste à installer Spark ](https://computingforgeeks.com/how-to-install-apache-spark-on-ubuntu-debian/)



### 3.3 Installation des packages

<u>On installe ensuite à l'aide de la commande **pip** <br />
les packages qui nous seront nécessaires</u> :

In [None]:
# Installation des bibliothèques nécéssaires au fonctionnement du projet
# Pandas : Manipulation de données tabulaires avec des DataFrame et Series.
# Pillow : Bibliothèque de traitement d’image (remplace PIL). Permet d'ouvrir, redimensionner, transformer les images.
# tensorflow :Bibliothèque de deep learning utilisée ici pour charger MobileNetV2 et faire des prédictions.
# Pyspark : Interface Python de Apache Spark, utilisée pour le traitement distribué des données.
# Pyarrow : Utilisé par Spark (et pandas) pour le transfert rapide de données entre Python et JVM (Spark) via Arrow.
!pip install Pandas pillow tensorflow pyspark pyarrow

## 3.4 Import des librairies

In [None]:
# Manipulation de données tabulaires (DataFrames, Series)
import pandas as pd  # Pour manipuler des tableaux de données structurées (CSV, Excel, etc.)

# Bibliothèque pour le traitement d’images (ouvrir, redimensionner, etc.)
from PIL import Image  # Pour ouvrir, manipuler et enregistrer des images dans différents formats

# Bibliothèque pour le calcul numérique (tableaux, manipulation de matrices)
import numpy as np  # Pour les calculs mathématiques rapides sur des tableaux multidimensionnels

# Outils pour la manipulation d’entrées/sorties (I/O) en mémoire, comme les fichiers binaires
import io  # Pour gérer les flux de données en mémoire (ex. : lire/écrire des fichiers sans passer par le disque)

# Outils pour interagir avec le système de fichiers (chemins, environnements, etc.)
import os  # Pour gérer les chemins, variables d’environnement, et opérations système

# Bibliothèque de deep learning pour l'entraînement et l'utilisation de modèles de réseaux de neurones
import tensorflow as tf  # Pour créer, entraîner et utiliser des modèles de deep learning (réseaux de neurones)

# Importation du modèle MobileNetV2 avec ses poids pré-entraînés + fonction de prétraitement des images
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
# MobileNetV2 : modèle CNN léger, pré-entraîné sur ImageNet, adapté à la reconnaissance d’images
# preprocess_input : fonction pour préparer les images au format attendu par MobileNetV2

# Conversion d’images PIL en tableau numpy (utile pour le prétraitement des images)
from tensorflow.keras.preprocessing.image import img_to_array  # Pour convertir une image PIL en tableau numpy

# Pour créer un sous-modèle Keras (par exemple, sans la couche de classification finale)
from tensorflow.keras import Model  # Pour définir des architectures de modèles personnalisés avec Keras

# Fonctions Spark pour manipuler les colonnes et créer des UDFs (User Defined Functions)
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
# col : pour référencer une colonne dans un DataFrame Spark
# pandas_udf : pour créer des fonctions personnalisées vectorisées sur des DataFrames Spark
# PandasUDFType : pour spécifier le type d’UDF (ex. : SCALAR)
# element_at, split : pour manipuler les données dans les colonnes (extraction, découpage de chaînes)

# Pour démarrer une session Spark depuis Python
from pyspark.sql import SparkSession  # Pour initialiser et configurer une application Spark

# Types Spark pour définir des schémas de colonnes dans les DataFrames
from pyspark.sql.types import ArrayType, FloatType  # Pour déclarer des colonnes contenant des listes de float


## 3.5 Définition des PATH pour charger les images <br /> et enregistrer les résultats

Dans cette version locale nous partons du principe que les données <br />
sont stockées dans le même répertoire que le notebook.<br />
Nous n'utilisons qu'un extrait de **300 images** à traiter dans cette <br />
première version en local.<br />
L'extrait des images à charger est stockée dans le dossier **Test1**.<br />
Nous enregistrerons le résultat de notre traitement <br />
dans le dossier "**Results_Local**"

In [None]:
# Importation du module drive de Google Colab pour accéder à Google Drive
from google.colab import drive

# Montage de Google Drive dans l'environnement Colab pour accéder aux fichiers stockés le Drive
drive.mount('/content/drive')

In [None]:
# Définition du chemin principal où se trouve le projet dans Google Drive
PATH = "/content/drive/MyDrive/Colab Notebooks/P#9"

# Définition du chemin vers le dossier contenant les images à traiter (Test1)
PATH_Data = PATH + "/data/Test1"

# Définition du chemin vers le dossier où enregistrer les résultats du traitement
PATH_Result = PATH + "/data/Results"

# Affichage du chemin **des données** pour vérification
print("PATH_Data: ", PATH_Data)

# Affichage du chemin **des résultats** pour vérification
print("PATH_Result: ", PATH_Result)


In [None]:
# Importation du module os pour interagir avec le système de fichiers
import os

# Vérification de l'existence du dossier contenant les images (Test1)
print("Test1 existe :", os.path.exists(PATH_Data))

# Vérification de l'existence du dossier pour enregistrer les résultats (Results)
print("Results existe :", os.path.exists(PATH_Result))


## 3.6 Création de la SparkSession

L’application Spark est contrôlée grâce à un processus de pilotage (driver process) appelé **SparkSession**. <br />
<u>Une instance de **SparkSession** est la façon dont Spark exécute les fonctions définies par l’utilisateur <br />
dans l’ensemble du cluster</u>. <u>Une SparkSession correspond toujours à une application Spark</u>.

<u>Ici nous créons une session spark en spécifiant dans l'ordre</u> :
 1. un **nom pour l'application**, qui sera affichée dans l'interface utilisateur Web Spark 
 2. que l'application doit s'exécuter **localement**. <br />
   Nous ne définissons pas le nombre de cœurs à utiliser (comme .master('local[4]) pour 4 cœurs à utiliser), <br />
   nous utiliserons donc tous les cœurs disponibles dans notre processeur.<br />
 3. une option de configuration supplémentaire permettant d'utiliser le **format "parquet"** <br />
   que nous utiliserons pour enregistrer et charger le résultat de notre travail.
 4. vouloir **obtenir une session spark** existante ou si aucune n'existe, en créer une nouvelle

In [None]:
# Installation de PySpark
!pip uninstall -y pyspark
!pip install pyspark==3.4.1

<u>Nous créons également la variable "**sc**" qui est un **SparkContext** issue de la variable **spark**</u> :

In [None]:
# Importation de la classe SparkSession depuis le module PySpark SQL
from pyspark.sql import SparkSession

# Création de la SparkSession
# La SparkSession est le point d’entrée principal pour exécuter des opérations Spark avec le DataFrame API.
# Le nom de l'application s'affichera dans l'interface Web de Spark (Spark UI).
# Mode d'exécution local : utilise tous les cœurs CPU disponibles de la machine.
# La configuration permet d'écrire les fichiers Parquet au format compatible avec les anciennes versions.
# On crée une session Spark ou on récupère celle déjà existante si elle existe.

spark = SparkSession.builder \
    .appName("P9") \
    .master("local[*]") \
    .config("spark.sql.parquet.writeLegacyFormat", "true") \
    .getOrCreate()


# Récupération du SparkContext depuis la SparkSession
# Le SparkContext permet de manipuler les RDD et d’accéder aux fonctionnalités bas niveau de Spark.
sc = spark.sparkContext


In [None]:
# Importation de la classe SparkSession depuis le module pyspark.sql
from pyspark.sql import SparkSession

# Création d'une SparkSession, point d'entrée principal pour utiliser Spark avec DataFrame API
# 1. On définit le nom de l'application Spark
# 2. On exécute Spark en local, utilise tous les cœurs CPU disponibles
# 3. On crée la session Spark ou récupère celle existante si elle existe déjà
spark = SparkSession.builder \
    .appName("Colab") \
    .master("local[*]") \
    .getOrCreate()

print(" SparkSession créée !")

<u>Affichage des informations de Spark en cours d'execution</u> :

In [None]:
# On affiche les informations de Spark en cours d'execution
spark

**On affiche:**

- Le nom de l'application ('spark.app.name': 'Colab')

- Le mode d’exécution ('spark.master': 'local[*]')

- Le répertoire de travail Spark ('spark.sql.warehouse.dir': 'file:/content/spark-warehouse')

- Les options Java pour le driver et les exécutors (nécessaires dans Colab)

- Les paramètres liés au Parquet ('spark.sql.parquet.writeLegacyFormat': 'true')

- Des infos sur le driver, l’ID de l’application, etc.


In [None]:
# Récupère toutes les configurations Spark sous forme de dictionnaire Python
scala_map = spark.conf._jconf.getAll()
python_dict = {}

iterator = scala_map.iterator()
while iterator.hasNext():
    entry = iterator.next()
    key = entry._1()    # Clé de la configuration
    value = entry._2()  # Valeur de la configuration
    python_dict[key] = value

print(python_dict)


## 3.7 Traitement des données

<u>Dans la suite de notre flux de travail, <br />
nous allons successivement</u> :
1. Préparer nos données
    1. Importer les images dans un dataframe **pandas UDF**
    2. Associer aux images leur **label**
    3. Préprocesser en **redimensionnant nos images pour <br />
       qu'elles soient compatibles avec notre modèle**
2. Préparer notre modèle
    1. Importer le modèle **MobileNetV2**
    2. Créer un **nouveau modèle** dépourvu de la dernière couche de MobileNetV2
3. Définir le processus de chargement des images et l'application <br />
   de leur featurisation à travers l'utilisation de pandas UDF
3. Exécuter les actions d'extraction de features
4. Enregistrer le résultat de nos actions
5. Tester le bon fonctionnement en chargeant les données enregistrées




### 3.7.1 Chargement des données

Les images sont chargées au format binaire, ce qui offre, <br />
plus de souplesse dans la façon de prétraiter les images.

Avant de charger les images, nous spécifions que nous voulons charger <br />
uniquement les fichiers dont l'extension est **jpg**.

Nous indiquons également de charger tous les objets possibles contenus <br />
dans les sous-dossiers du dossier communiqué.

In [None]:
# Importe le module permettant d'accéder à Google Drive depuis Google Colab
from google.colab import drive

# Monte Google Drive dans l'environnement Colab pour accéder aux fichiers
drive.mount('/content/drive')

# Définit le chemin principal vers le dossier du projet dans Google Drive
PATH = "/content/drive/MyDrive/Colab Notebooks/P#9"

# Définit le chemin vers le dossier contenant les données à utiliser
PATH_Data = PATH + "/data/Test1"

# Définit le chemin où les résultats seront enregistrés
PATH_Result = PATH + "/data/Results"

# Affiche le chemin vers les données d'entrée
print("PATH_Data:", PATH_Data)

# Affiche le chemin vers les résultats de sortie
print("PATH_Result:", PATH_Result)


<u>Affichage des 5 premières images contenant</u> :
 - le path de l'image
 - la date et heure de sa dernière modification
 - sa longueur
 - son contenu encodé en valeur hexadécimal

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [None]:
# Lecture des images au format binaire depuis le dossier PATH_Data avec Spark
images = (
    spark.read.format("binaryFile")
    .option("pathGlobFilter", "*.jpg")          # Filtre uniquement les fichiers .jpg
    .option("recursiveFileLookup", "true")      # Recherche récursive dans les sous-dossiers
    .load(PATH_Data)                            # Charge les fichiers depuis le dossier défini
)

# Importation des fonctions nécessaires pour manipuler les chaînes dans les colonnes Spark
from pyspark.sql.functions import split, element_at

# Ajoute une colonne "label" qui extrait le nom du dossier parent (catégorie du fruit) à partir du chemin du fichier
images = images.withColumn(
    "label",  # Nom de la nouvelle colonne
    element_at(  # Sélectionne un élément précis dans une liste
        split(images["path"], "/"),  # Découpe le chemin du fichier en une liste selon '/'
        -2  # Prend l'avant-dernier élément, qui correspond au nom du dossier parent (le label)
    )
)

# Affiche les 5 premières lignes du DataFrame, avec le chemin du fichier et le label extrait, sans troncature
images.select("path", "label").show(5, truncate=False)


In [None]:
from pyspark.sql.functions import split, element_at

# Ajoute une colonne 'label' extraite depuis le chemin (avant le nom du fichier)
images = images.withColumn("label", element_at(split(images["path"], "/"), -2))

# Sélectionne toutes les colonnes demandées pour vérification
images.select("path", "modificationTime", "length", "content", "label").show(5, truncate=False)

La colonne **content** contient les données **binaires (brutes)** de **chaque image** chargée depuis le chemin.

### 3.7.2 Préparation du modèle

Je vais utiliser la technique du **transfert learning** pour extraire les features des images.<br />
J'ai choisi d'utiliser le modèle **MobileNetV2** pour sa rapidité d'exécution comparée <br />
à d'autres modèles comme *VGG16* par exemple.

Pour en savoir plus sur la conception et le fonctionnement de MobileNetV2, <br />
je vous invite à lire [cet article](https://towardsdatascience.com/review-mobilenetv2-light-weight-model-image-classification-8febb490e61c).

<u>Voici le schéma de son architecture globale</u> :

![Architecture de MobileNetV2](img/mobilenetv2_architecture.png)

Il existe une dernière couche qui sert à classer les images <br />
selon 1000 catégories que nous ne voulons pas utiliser.<br />
L'idée dans ce projet est de récupérer le **vecteur de caractéristiques <br />
de dimensions (1,1,1280)** qui servira, plus tard, au travers d'un moteur <br />
de classification à reconnaitre les différents fruits du jeu de données.

Comme d'autres modèles similaires, **MobileNetV2**, lorsqu'on l'utilise <br />
en incluant toutes ses couches, attend obligatoirement des images <br />
de dimension (224,224,3). Nos images étant toutes de dimension (100,100,3), <br />
nous devrons simplement les **redimensionner** avant de les confier au modèle.

<u>Dans l'odre</u> :
 1. Nous chargeons le modèle **MobileNetV2** avec les poids **précalculés** <br />
    issus d'**imagenet** et en spécifiant le format de nos images en entrée
 2. Nous créons un nouveau modèle avec:
  - <u>en entrée</u> : l'entrée du modèle MobileNetV2
  - <u>en sortie</u> : l'avant dernière couche du modèle MobileNetV2

In [None]:
# 1. Import des modules nécessaires
from tensorflow.keras.applications import MobileNetV2
from tensorflow.keras.models import Model

# 2. Charger le modèle complet avec les poids ImageNet
model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3))

# 3. Supprimer la dernière couche Dense (on garde l'avant-dernière)
new_model = Model(inputs=model.input, outputs=model.layers[-2].output)

# 4. Diffuser les poids (paramètres du modèle) via Spark
broadcast_weights = sc.broadcast(new_model.get_weights())



In [None]:
# Importe la classe MobileNetV2 depuis les applications pré-entraînées de Keras
from tensorflow.keras.applications import MobileNetV2
# Importe la classe Model pour créer des modèles personnalisés
from tensorflow.keras.models import Model

# Définit une fonction qui reconstruit le modèle MobileNetV2 avec les poids broadcastés
def model_fn():
    # but de la fonction
    """
    Reconstruit un modèle MobileNetV2 avec les poids 'imagenet',
    supprime la dernière couche Dense, applique les poids broadcastés.
    """
    # Charge le modèle MobileNetV2 complet pré-entraîné sur ImageNet
    model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3))
    # Boucle sur toutes les couches du modèle
    for layer in model.layers:
        # Gèle chaque couche pour empêcher sa modification pendant l'entraînement
        layer.trainable = False

    # Crée un nouveau modèle en gardant l'entrée originale mais en excluant la dernière couche
    new_model = Model(inputs=model.input, outputs=model.layers[-2].output)
    # Applique les poids diffusés via Spark au nouveau modèle
    new_model.set_weights(broadcast_weights.value)
    # Retourne le modèle modifié
    return new_model


In [None]:
# Appelle la fonction qui instancie le modèle MobileNetV2 avec les poids broadcastés
model = model_fn()

# Affiche un résumé de l'architecture du modèle avec le nombre de paramètres
print(model.summary())

Le tableau montre l'architecture complète de **MobileNetV2** (_sans la couche de classification finale_) :

    1. Structure : **155 couches** composées de **blocs de convolutions** séparables en profondeur (depthwise separable convolutions)

    2. Entrée : Images de taille (224, 224, 3)

    3. Sortie finale : Vecteur de caractéristiques de dimension (1280) après **Global Average Pooling**

    4. Paramètres : **~2.26 millions de paramètres**, tous non-entraînables (gelés)

    5. Principe : Chaque "block" applique une expansion, une convolution depthwise, puis une projection pour extraire des caractéristiques de plus en plus abstraites

    6. Utilité : Ce vecteur de 1280 dimensions servira de représentation numérique des images pour la reconnaissance des fruits, remplaçant ainsi l'extraction manuelle de caractéristiques

_Un bloc de convolution (ou couche de convolution) est l’élément de base d’un réseau de neurones convolutionnel (CNN), utilisé principalement pour l’analyse et la classification d’images._

In [None]:
# Installation des dépendances nécessaires pour la visualisation des modèles
!pip install pydot
!apt-get install graphviz  # Installation de graphviz (nécessaire avec pydot)

# Import de la fonction pour visualiser l'architecture des modèles Keras
from tensorflow.keras.utils import plot_model

# Création du fichier PNG contenant le schéma de l'architecture du modèle
# model = le modèle MobileNetV2 créé précédemment
# to_file = nom du fichier de sortie
# show_shapes = affiche les dimensions des tenseurs à chaque couche
# show_layer_names = affiche le nom de chaque couche
plot_model(new_model, to_file="mobilenet_reconstruit.png", show_shapes=True, show_layer_names=True)

# Import de la classe pour afficher des images dans Jupyter/Colab
from IPython.display import Image

# Affichage de l'image PNG générée contenant l'architecture du modèle
# filename = chemin vers le fichier PNG créé par plot_model
Image(filename="mobilenet_reconstruit.png")


- Dernière couche : global_average_pooling2d → sortie de forme (None, 1280)

- include_top=True Nous avons tronqué manuellement le modèle pour enlever la couche Dense

- Nombre total de paramètres : 2,257,984

### 3.7.3 Définition du processus de chargement des images et application <br/>de leur featurisation à travers l'utilisation de pandas UDF

Ce notebook définit la logique par étapes, jusqu'à Pandas UDF.

<u>L'empilement des appels est la suivante</u> :

- Pandas UDF
  - featuriser une série d'images pd.Series
   - prétraiter une image

In [None]:
# Import de la bibliothèque pandas pour la manipulation de données tabulaires
import pandas as pd

# Import de numpy pour les calculs numériques sur les tableaux
import numpy as np

# Import du module io pour les opérations d'entrées/sorties en mémoire
import io

# Import de la classe Image de PIL pour ouvrir et manipuler les images
from PIL import Image

# Import du modèle MobileNetV2 pré-entraîné de TensorFlow/Keras
from tensorflow.keras.applications import MobileNetV2

# Import de la fonction de prétraitement spécifique à MobileNetV2
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input

# Import de la classe Model pour créer des modèles personnalisés
from tensorflow.keras.models import Model

# Import de la fonction pour convertir une image PIL en tableau numpy
from tensorflow.keras.preprocessing.image import img_to_array

# Import des fonctions PySpark pour créer des UDF (User Defined Functions)
from pyspark.sql.functions import pandas_udf, col

# Import des types de données PySpark pour définir les schémas
from pyspark.sql.types import ArrayType, FloatType

# 1. Fonction de prétraitement d'une image
def preprocess(content):
    """
    Prétraite une image binaire : ouvre, redimensionne et normalise
    Args: content - données binaires de l'image
    Returns: tableau numpy de l'image prétraitée
    """
    # Ouvre l'image à partir des données binaires et la redimensionne à 224x224 pixels
    img = Image.open(io.BytesIO(content)).resize((224, 224))
    # Convertit l'image PIL en tableau numpy
    arr = img_to_array(img)
    # Applique le prétraitement spécifique à MobileNetV2 (normalisation)
    return preprocess_input(arr)

# 2. Fonction d'extraction de caractéristiques sur une série d'images
def featurize_series(model, content_series: pd.Series) -> pd.Series:
    """
    Extrait les features d'une série d'images avec le modèle MobileNetV2
    Args: model - modèle MobileNetV2, content_series - série pandas d'images
    Returns: série pandas de vecteurs de caractéristiques
    """
    # Applique le prétraitement à chaque image de la série
    input_batch = np.stack(content_series.map(preprocess))
    # Fait la prédiction (extraction de features) sur le batch d'images
    predictions = model.predict(input_batch)
    # Convertit les prédictions en liste aplatie et retourne comme série pandas
    output = [p.flatten() for p in predictions]
    return pd.Series(output)

# 3. Construction du modèle MobileNetV2 sans la couche de classification
def model_fn():
    """
    Crée un modèle MobileNetV2 pour l'extraction de features (sans classification)
    Returns: modèle MobileNetV2 modifié
    """
    # Charge le modèle de base avec les poids ImageNet, sans la couche finale
    base_model = MobileNetV2(weights='imagenet', include_top=False, pooling='avg', input_shape=(224, 224, 3))
    # Retourne un modèle avec l'entrée et la sortie du modèle de base
    return Model(inputs=base_model.input, outputs=base_model.output)

# 4. Définition de l'UDF Pandas pour PySpark avec hints de type
@pandas_udf('array<float>')  # Spécifie que la fonction retourne un array de float
def featurize_udf(content_series: pd.Series) -> pd.Series:
    """
    UDF PySpark qui applique l'extraction de features sur une série d'images
    """
    # Crée le modèle MobileNetV2
    model = model_fn()
    # Applique l'extraction de features sur la série d'images
    return featurize_series(model, content_series)


In [None]:
# Création d'un DataFrame avec les features extraites à partir des 20 premières images
features_df = images.repartition(20).select(
    col("path"),      # Sélectionne la colonne contenant le chemin du fichier image
    col("label"),     # Sélectionne la colonne contenant le label (nom du fruit)
    # Applique l'UDF pour extraire les features et renomme la colonne "content" en "features"
    featurize_udf("content").alias("features")
)

# Affiche les 5 premières lignes du DataFrame avec les features, sans troncature
features_df.show(5, truncate=False)




    1. path : Contient le chemin complet vers chaque fichier image dans le système de fichiers (ex: /data/Test1/apple_golden_1/r1_51.jpg)

    2. label : Contient le nom de la catégorie de fruit extraite automatiquement du nom du dossier parent (ex: apple_golden_1, banana, orange)

    3. features : Contient le vecteur de caractéristiques numériques de dimension 1280 extrait par le modèle MobileNetV2 pour chaque image. Ce vecteur représente les caractéristiques visuelles de l'image sous forme de nombres décimaux qui pourront servir à la reconnaissances des images


1. L'utilité des Users Define Fonctions

Les UDF (fonctions définies par l'utilisateur) permettent d’appliquer une logique personnalisée à des colonnes de DataFrames PySpark lorsque les fonctions natives ne suffisent pas.
Les cas d’usage principaux :

    2. Transformations complexes (ex: NLP, traitement d’images, calculs métiers spécifiques).

    3. Intégration de bibliothèques Python (ex: TensorFlow, scikit-learn, OpenCV).

    4. Manipulations de données non supportées par les fonctions Spark natives (ex: parsing de JSON complexe).

Types d’UDF :

    5. UDF standard : Traite les données ligne par ligne (lent pour les gros volumes).

    6. Pandas UDF (vectorisées) : Traite des batchs de données via Apache Arrow, optimisé pour la performance (jusqu’à 100x plus rapide).

2. A propos des hints (indications de type) ?

Les hints (annotations de type Python) spécifient les types d’entrée/sortie des UDF pour aider PySpark à optimiser l’exécution. Introduits dans Spark 3.0+, ils remplacent l’ancienne syntaxe avec PandasUDFType.

In [None]:
# Sauvegarde des features extraites au format Parquet

# Après avoir extrait les vecteurs de caractéristiques des images à l'aide du modèle MobileNetV2,
# nous sauvegarderons les résultats dans un fichier au format **Parquet**.

# Avantages du format Parquet :
# - une meilleure performance de lecture/écriture dans un environnement distribué (comme Spark),
# - un format compressé optimisé pour le stockage de gros volumes de données.

# Définition du chemin de sortie pour les features
PATH_Result = "/content/features_output"

# Sauvegarde du DataFrame au format Parquet avec mode "overwrite" (écrase si existe)
features_df.write.mode("overwrite").parquet(PATH_Result)


### 3.7.4 Exécution des actions d'extraction de features


In [None]:
#  Cette ligne permet de réduire la taille des batchs utilisés par Arrow (backend de Pandas UDF) pour éviter les erreurs Out Of Memory sur de très gros fichiers (ex : très grandes images). Ici, elle est laissée en commentaire car non utilisée dans ce projet.
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

Nous pouvons maintenant exécuter la featurisation sur l'ensemble de notre DataFrame Spark.

In [None]:
print(sc._jvm)  # Doit retourner un objet non null et sans erreur

In [None]:
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import ArrayType, FloatType
import pandas as pd


In [None]:
#  Déclaration d'une UDF (User Defined Function) en mode Pandas avec Spark
# Elle prend une colonne de type `binary` (ici les contenus d'images) et retourne un vecteur de float (ArrayType(FloatType()))
@pandas_udf(ArrayType(FloatType()))
def featurize_udf(content_series: pd.Series) -> pd.Series:
    # Création du modèle MobileNetV2 avec les mêmes paramètres (architecture sans la couche Dense de classification)
    model = model_fn()

    # Chargement des poids pré-entraînés qui ont été broadcastés à tous les workers Spark
    model.set_weights(broadcast_weights.value)

    # Liste qui contiendra les vecteurs de caractéristiques de chaque image
    features = []

    # Boucle sur chaque image contenue dans la série (format bytes)
    for bytestr in content_series:
        try:
            # Chargement de l’image à partir de son contenu brut (bytes) et redimensionnement à (224, 224)
            img = Image.open(io.BytesIO(bytestr)).resize((224, 224))

            # Conversion en tableau NumPy (on garde seulement les 3 canaux RGB)
            arr = np.array(img)[..., :3]

            # Ajout d’une dimension (car le modèle attend un batch de forme (1, 224, 224, 3))
            arr = np.expand_dims(arr, axis=0)

            # Prétraitement requis par MobileNetV2 (normalisation des pixels, etc.)
            arr = preprocess_input(arr)

            # Prédiction avec le modèle → vecteur de caractéristiques (de dimension (1280,))
            vec = model.predict(arr)[0].flatten()

            # Ajout du vecteur à la liste
            features.append(vec)

        except Exception as e:
            # En cas d'erreur (image corrompue, format non reconnu...), on ajoute un vecteur nul
            features.append(np.zeros(1280))

    # On retourne une Series Pandas contenant tous les vecteurs de caractéristiques
    return pd.Series(features)


In [None]:
# Vérification rapide avant l'exécution complète
print(f"Taille des données : {images.count()}")
print(f"Partitions créées : {features_df.rdd.getNumPartitions()}")


In [None]:
# On crée un nouveau DataFrame `features_df` contenant les résultats de l'extraction de features
features_df = (
    images
    # On limite ici volontairement à 330 images pour accélérer le traitement en mode local ou test
    .limit(330)

    # Répartition des données en 20 partitions pour profiter du parallélisme sur les workers Spark
    .repartition(20)

    # Sélection des colonnes utiles :
    .select(
        col("path"),         # Le chemin du fichier image
        col("label"),        # Le label (classe de l’image, par exemple "apple", "orange", etc.)

        # Extraction des features à partir du contenu binaire de l'image (colonne 'content')
        # via la fonction UDF définie précédemment. Le résultat est une colonne de vecteurs de float.
        featurize_udf("content").alias("features")
    )
)


<u>Affichage des 5 premières images contenant</u> :
 - le path de l'image
 - la date et heure de sa dernière modification
 - sa longueur
 - son contenu encodé en valeur hexadécimal

<u>Rappel du PATH où seront inscrits les fichiers au format "**parquet**" <br />
contenant nos résultats, à savoir, un DataFrame contenant 3 colonnes</u> :
 1. Path des images
 2. Label de l'image
 3. Vecteur de caractéristiques de l'image

In [None]:
features_df.show(5, truncate=False)

In [None]:
# Définition du dossier de sortie pour la sauvegarde des features extraites.
PATH_Result = "/content/features_output"

- Le format Parquet est un format de fichier colonne, compressé et optimisé pour le Big Data, qui permet de stocker et d’analyser efficacement de grandes quantités de données, tout en accélérant les requêtes analytiques et en réduisant l’espace de stockage

In [None]:
# Enregistrement des images au format parquet
features_df.write.mode("overwrite").parquet(PATH_Result)

### Extraction et sauvegarde des features
À ce stade, nous avons réalisé les étapes suivantes :

- Chargé le modèle MobileNetV2 sans la couche de classification (include_top=False, pooling='avg')

- Prétraité les images en les redimensionnant à (224, 224) et en appliquant la normalisation attendue par le modèle

- Converti chaque image en un vecteur de caractéristiques de dimension (1280,)

- Utilisé une Pandas UDF pour paralléliser l’extraction des features à l’aide de Spark

- Réparti les données en 20 partitions pour exploiter le parallélisme

- Limité l’échantillon à 330 images pour faciliter les tests en mode local

- Enregistré les résultats au format parquet à l’emplacement spécifié par PATH_Result

- Le DataFrame final contient 3 colonnes :
path : chemin d'accès de l'image

- label : étiquette (classe) de l'image

- features : vecteur de caractéristiques (extrait par le modèle)

<u>Enregistrement des données traitées au format "**parquet**"</u> :

- Format de sauvegarde : Parquet
- Dimensions des features : 1280 floats par image
- Utilisation typique : ces features peuvent servir pour l'entraînement ou l'évaluation de modèles de classification

Qu’est-ce que le format Parquet ?

Parquet est un format de fichier ouvert, orienté colonne, conçu pour le stockage et le traitement efficace de grandes quantités de données, notamment dans les environnements Big Data et Cloud


    Grâce au format parquet nous pouvons stocker les données sous forme compressée et optimisée : Parquet compresse efficacement les données, ce qui réduit la taille des fichiers et les coûts de stockage

Accélérer les analyses : Grâce à son organisation par colonne, Parquet permet de lire seulement les colonnes nécessaires lors des requêtes, ce qui accélère considérablement les traitements analytiques par rapport à des formats comme CSV ou JSON


Faciliter l’intégration avec les outils Big Data : Parquet est nativement supporté par Spark, Hive, Presto, AWS Athena, Google BigQuery et la plupart des outils de data lakes modernes


Caractéristiques principales du format Parquet

    Stockage orienté colonne : Les données sont organisées colonne par colonne, ce qui permet de lire ou de compresser chaque colonne séparément

Compression et encodage avancés : Parquet applique des algorithmes de compression et d’encodage efficaces, adaptés à chaque type de donnée, pour optimiser l’espace et la rapidité

Support des types complexes : Parquet gère naturellement les listes, dictionnaires et structures imbriquées, ce qui est utile pour les données complexes

Métadonnées riches : Le fichier stocke des informations sur le schéma, la structure, les statistiques et la localisation des données, facilitant l’optimisation des requêtes

Scalabilité : Idéal pour traiter des millions ou milliards de lignes dans des architectures distribuées.


## 3.8 Chargement des données enregistrées et validation du résultat

<u>On charge les données fraichement enregistrées dans un **DataFrame Pandas**</u> :

In [None]:
# 1. Chargement du fichier Parquet sauvegardé
import pandas as pd
df = pd.read_parquet(PATH_Result, engine='pyarrow')

# 2. Affichage des premières lignes du DataFrame
df.head()

# 3. Validation que chaque vecteur de caractéristiques fait bien 1280 dimensions
df.loc[0, 'features'].shape

In [None]:
df.head()

In [None]:
# Vérification que toutes les lignes ont bien 1280 dimensions
assert all(df['features'].apply(lambda x: len(x) == 1280)), "Certaines lignes ont une mauvaise dimension"

Nous avons validé que :

- Le fichier Parquet est lisible

- Les vecteurs extraits ont bien la taille de 1280 flottants

- Le format de données est prêt pour les prochaines étapes : réduction de dimension, visualisation ou entraînement d’un modèle de classification

- Nous pouvons maintenant passer à la section suivante : réduction de dimension par PCA avec PySpark.

## 3.9 Réduction de dimension PCA en PySpark

**La réduction de dimension nous servira à réduire le volume de données à traiter :**

- Les features brutes extraites du modèle (1280 dimensions) sont très lourdes. Cela ralentit les traitements, surtout à grande échelle.

- Supprimer le bruit :
Certaines dimensions n'apportent que peu d'information. PCA permet de garder l'essentiel de la variance avec moins de dimensions (ex : 50).

- Accélérer l'entraînement futur des modèles :
Moins de dimensions = entraînement plus rapide + moins de mémoire.

- Faciliter la visualisation :
Réduire à 2 ou 3 dimensions permet de projeter visuellement les données.


La réduction de dimension a pour objectif de simplifier les données tout en conservant l’essentiel de l’information. Dans notre cas, chaque image est représentée par un vecteur de 1280 dimensions (extrait via MobileNetV2). Cela peut poser des problèmes :

- de lourdeur pour les traitements,

- de bruit (certaines dimensions n’apportent pas d'information utile),

- et de visualisation ou entraînement plus complexe.

Nous avons donc appliqué une réduction de dimension via PCA (Analyse en Composantes Principales) avec PySpark, selon les étapes suivantes :

1. Rechargement du DataFrame Spark contenant les features extraites.

2. Conversion de la colonne features en vecteur MLlib avec VectorUDT (format requis pour les algos Spark ML).

3. Application d’un PCA avec 50 composantes pour réduire le vecteur (1280 ➝ 50).

4. Affichage du DataFrame transformé avec les nouvelles colonnes features_pca.

- Interprétation visuelle du PCA
Pour évaluer la pertinence de la réduction de dimension, nous avons projeté les 2 premières composantes principales dans un plan 2D.
Le graphique obtenu montre une bonne séparation entre les classes apple_6 et apple_golden_1, ce qui valide :

- l’efficacité de la méthode d’extraction des features (MobileNetV2),

- la pertinence du PCA,

- la qualité des données.

Cela confirme que les données sont prêtes pour un entraînement efficace de modèle de classification.

In [None]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf, col
from pyspark.sql import functions as F

# 1. Recharger le DataFrame parquet en Spark
df_spark = spark.read.parquet(PATH_Result)

# 2. Convertir la colonne de listes 'features' en vecteur MLlib
vectorize_udf = udf(lambda x: Vectors.dense(x), VectorUDT())
df_spark = df_spark.withColumn("features_vec", vectorize_udf(col("features")))

# 3. Appliquer PCA avec 50 dimensions
pca = PCA(k=50, inputCol="features_vec", outputCol="features_pca")
model = pca.fit(df_spark)
df_pca = model.transform(df_spark)

# 4. Visualisation du résultat
df_pca.select("path", "label", "features_pca").show(5, truncate=False)


### Interprétation du PCA

Ce graphique montre la projection des vecteurs d'images dans l'espace réduit à 2 dimensions à l'aide du PCA.

On observe que les classes `apple_6` et `apple_golden_1` sont bien séparées, ce qui valide :
- la qualité du prétraitement des images,
- l'efficacité du modèle de featurisation (MobileNetV2),
- et la pertinence de la réduction de dimension par PCA.

Cela nous rassure sur la capacité future à entraîner un modèle de classification performant.

In [None]:
import pandas as pd
import matplotlib.pyplot as plt

# Convertir en Pandas pour visualiser
sample_df = df_pca.select("features_pca", "label").limit(300).toPandas()

# Convertir Spark Vector en liste
sample_df["features_pca"] = sample_df["features_pca"].apply(lambda v: v.toArray())

# Extraire les deux premières composantes
x = sample_df["features_pca"].apply(lambda x: x[0])
y = sample_df["features_pca"].apply(lambda x: x[1])

# Affichage en scatter plot avec la couleur selon le label
plt.figure(figsize=(8, 6))
for label in sample_df["label"].unique():
    plt.scatter(
        x[sample_df["label"] == label],
        y[sample_df["label"] == label],
        label=label,
        alpha=0.6
    )

plt.title("Projection PCA des images (2 premières dimensions)")
plt.xlabel("PCA 1")
plt.ylabel("PCA 2")
plt.legend()
plt.grid(True)
plt.show()


- Nous appliquons le **PCA** pour réduire la dimension des vecteurs de **caractéristiques extraits par MobileNetV2** : cela permet de conserver l’essentiel de l’information, d’accélérer les traitements, de supprimer le bruit, et de faciliter la visualisation et l’entraînement des modèles de classification

# 4. Déploiement de la solution sur le cloud

### 4.10.1 Démarrage de la session Spark

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [1]:
# Démarrer l'application Spark
spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1750769563434_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<pyspark.sql.session.SparkSession object at 0x7f6c68697130>

In [2]:
# Vérifier les ressources disponibles
spark.sparkContext._conf.getAll()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('spark.eventLog.enabled', 'true'), ('spark.driver.extraLibraryPath', '/usr/lib/hadoop/lib/native:/usr/lib/hadoop-lzo/lib/native:/usr/lib/jvm/java-17-amazon-corretto.x86_64/lib/server:/docker/usr/lib/hadoop/lib/native:/docker/usr/lib/hadoop-lzo/lib/native:/docker/usr/lib/jvm/java-17-amazon-corretto.x86_64/lib/server'), ('spark.driver.extraClassPath', '/usr/lib/hadoop-lzo/lib/*:/usr/lib/hadoop/hadoop-aws.jar:/usr/share/aws/aws-java-sdk/*:/usr/share/aws/aws-java-sdk-v2/*:/usr/share/aws/emr/goodies/lib/emr-spark-goodies.jar:/usr/share/aws/emr/security/conf:/usr/share/aws/emr/security/lib/*:/usr/share/aws/redshift/jdbc/*:/usr/share/aws/redshift/spark-redshift/lib/*:/usr/share/aws/kinesis/spark-sql-kinesis/lib/*:/usr/share/aws/hmclient/lib/aws-glue-datacatalog-spark-client.jar:/usr/share/java/Hive-JSON-Serde/hive-openx-serde.jar:/usr/share/aws/emr/s3select/lib/emr-s3-select-spark-connector.jar:/docker/usr/lib/hadoop-lzo/lib/*:/docker/usr/lib/hadoop/hadoop-aws.jar:/docker/usr/share/aws/aws-

### 4.10.2 Installation des packages
### 4.10.3 Import des librairies

In [3]:
# Installation de versions compatibles dans le Kernel PySpark
sc.install_pypi_package("tensorflow==2.11.0")
sc.install_pypi_package("keras==2.11.0")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting tensorflow==2.11.0
  Downloading tensorflow-2.11.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (588.3 MB)
Collecting gast<=0.4.0,>=0.2.1
  Downloading gast-0.4.0-py3-none-any.whl (9.8 kB)
Collecting protobuf<3.20,>=3.9.2
  Downloading protobuf-3.19.6-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (1.1 MB)
Collecting keras<2.12,>=2.11.0
  Downloading keras-2.11.0-py2.py3-none-any.whl (1.7 MB)
Collecting tensorflow-estimator<2.12,>=2.11.0
  Downloading tensorflow_estimator-2.11.0-py2.py3-none-any.whl (439 kB)
Collecting tensorboard<2.12,>=2.11
  Downloading tensorboard-2.11.2-py3-none-any.whl (6.0 MB)
Collecting tensorboard-data-server<0.7.0,>=0.6.0
  Downloading tensorboard_data_server-0.6.1-py3-none-manylinux2010_x86_64.whl (4.9 MB)
Installing collected packages: tensorboard-data-server, protobuf, tensorflow-estimator, tensorboard, keras, gast, tensorflow
  Attempting uninstall: protobuf
    Found existing installation: protobuf 3.20.3
    Not uninstal

In [4]:
# Imports
import pandas as pd
import numpy as np
import os
import io

# Import de TensorFlow uniquement depuis tensorflow.keras
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array


# PySpark
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf, PandasUDFType

# Création session Spark
spark = SparkSession.builder.getOrCreate()


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.10.3.1 Configuration de Amazon Web Services (AWS)

| Service      | Définition de l’acronyme                       | Définition courte                                                                 | Utilisation principale                                           |
|--------------|------------------------------------------------|----------------------------------------------------------------------------------|------------------------------------------------------------------|
| **EC2**      | **Elastic Compute Cloud**                          | Serveurs virtuels (instances) à la demande dans le cloud.                        | Calcul, exécution de scripts, clusters Spark/Hadoop, ML.         |
| **S3**       | **Simple Storage Service**                         | Stockage d’objets scalable pour tout type de données.                            | Data lake, stockage de datasets, partage de fichiers volumineux. |
| **S3A**      | S3 Advanced (connecteur Hadoop/Spark)          | Connecteur Hadoop/Spark pour accéder à S3 comme un système de fichiers distribué.| Lecture/écriture directe sur S3 depuis Spark, Hadoop, etc.       |
| **IAM**      | **Identity and Access Management**                 | Gestion des accès et permissions aux ressources AWS.                             | Sécurité, gestion des utilisateurs et des droits.                |
| **EMR**      | **Elastic MapReduce**                              | Service managé pour exécuter Spark, Hadoop, Hive sur des clusters EC2.           | Traitement massif de données, analyse distribuée, ETL.           |
| **Glue**     | -                                              | Service ETL managé pour extraire, transformer et charger des données.            | Préparation, nettoyage, transformation, catalogage des données.  |
| **Redshift** | -                                              | Data warehouse cloud scalable pour l’analyse rapide de grandes quantités de données. | Analyse de données, BI, requêtes analytiques massives.        |
| **Kinesis**  | -                                              | Collecte, traitement et analyse de flux de données en temps réel.                | Streaming, ingestion temps réel, analyse de logs ou IoT.         |
| **Athena**   | -                                              | Requête interactive SQL sur des données stockées dans S3.                        | Analyse ad hoc, exploration rapide de données sur S3.            |
| **SageMaker**| -                                              | Plateforme managée pour construire, entraîner et déployer des modèles ML.        | Développement, entraînement, mise en production de modèles ML.   |
| **Lambda**   | -                                              | Calcul serverless pour exécuter du code en réponse à des événements.             | Orchestration, automatisation, traitement d’événements.          |


### 4.10.4 Définition des PATH pour charger les images et enregistrer les résultats

In [5]:
# Chemins S3 pour données d'entrée et de sortie
PATH_Data_S3 = "s3://aws-emr-studio-065693560228-eu-west-3/Test1/"
PATH_Result_S3 = "s3://aws-emr-studio-065693560228-eu-west-3/Results/"

# Vérification
print(" PATH_Data_S3 :", PATH_Data_S3)
print(" PATH_Result_S3 :", PATH_Result_S3)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

 PATH_Data_S3 : s3://aws-emr-studio-065693560228-eu-west-3/Test1/
 PATH_Result_S3 : s3://aws-emr-studio-065693560228-eu-west-3/Results/

### 4.10.5 Traitement des données

#### 4.10.5.1 Chargement des données

In [6]:
images = (
    spark.read.format("binaryFile")
    .option("pathGlobFilter", "*.jpg")
    .option("recursiveFileLookup", "true")
    .load(PATH_Data_S3)
)
images.show(5)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://aws-emr-stud...|2025-06-20 09:28:56| 31310|[FF D8 FF E0 00 1...|
|s3://aws-emr-stud...|2025-06-20 09:28:56| 30863|[FF D8 FF E0 00 1...|
|s3://aws-emr-stud...|2025-06-20 09:28:57| 29604|[FF D8 FF E0 00 1...|
|s3://aws-emr-stud...|2025-06-20 09:28:56| 29521|[FF D8 FF E0 00 1...|
|s3://aws-emr-stud...|2025-06-20 09:28:57| 29290|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [7]:
from pyspark.sql.functions import split, element_at

# Ajoute une colonne 'label' qui extrait le nom du dossier parent (catégorie) depuis le chemin du fichier
images = images.withColumn(
    'label',                                    # Nom de la nouvelle colonne
    element_at(split(images['path'], '/'), -2)  # Prend l'avant-dernier élément du chemin (le dossier parent)
)

# Affiche le schéma du DataFrame pour vérifier la structure des colonnes
images.printSchema()

# Affiche les 5 premières lignes du DataFrame avec seulement les colonnes 'path' et 'label'
images.select('path', 'label').show(5, False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

+-------------------------------------------------------------------------+--------------+
|path                                                                     |label         |
+-------------------------------------------------------------------------+--------------+
|s3://aws-emr-studio-065693560228-eu-west-3/Test1/apple_golden_1/r0_23.jpg|apple_golden_1|
|s3://aws-emr-studio-065693560228-eu-west-3/Test1/apple_golden_1/r0_19.jpg|apple_golden_1|
|s3://aws-emr-studio-065693560228-eu-west-3/Test1/apple_golden_1/r0_11.jpg|apple_golden_1|
|s3://aws-emr-studio-065693560228-eu-west-3/Test1/apple_golden_1/r0_15.jpg|apple_golden_1|
|s3://aws-emr-studio-065693560228-eu-west-3/Test1/apple_golden_1/r0_7.jpg |apple_golden_1|
+------------------------------------------------------------------

- Nous avons chargé toutes les images au format binaire avec Spark, puis ajouté une colonne label correspondant à la catégorie de chaque image, extraite automatiquement du nom du dossier parent dans le chemin.

- Cette étape prépare les données pour l’extraction de features et la suite du pipeline de traitement.

#### 4.10.5.2 Préparation du modèle

### 4.10.5.2 Préparation du modèle MobileNetV2

Dans cette section, nous préparons le modèle de Deep Learning utilisé pour extraire des features visuelles à partir des images stockées dans notre bucket S3.

Nous utilisons **MobileNetV2**, un modèle léger et efficace préentraîné sur ImageNet, particulièrement adapté à un traitement distribué.

#### Étapes réalisées :
- Chargement de `MobileNetV2` avec les poids `imagenet`
- Suppression de la dernière couche de classification (nous voulons uniquement les features)
- Désactivation de l'entraînement des couches (inférence uniquement)
- Diffusion (broadcast) des poids du modèle à l’ensemble des workers du cluster Spark

Cela nous permet ensuite d’utiliser ce modèle au sein d’une fonction distribuée (`model_fn`) compatible avec un `mapInPandas` ou `rdd.map()` pour le traitement parallèle des images.

In [8]:
# 1. Import des modules nécessaires
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input  # Import du modèle MobileNetV2 et de la fonction de prétraitement
from tensorflow.keras.models import Model  # Import de la classe Model pour créer un modèle personnalisé
import numpy as np  # Import de numpy pour la manipulation des tableaux

# 2. Création du modèle sans la dernière couche de classification
base = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224,224,3))  # Charge MobileNetV2 avec les poids ImageNet
model = Model(inputs=base.input, outputs=base.layers[-2].output)  # Crée un modèle qui sort l'avant-dernière couche (vecteur de features)


# Récupère et **broadcast** les poids (paramètres) du modèle pré-entraîné
# Ici on Broadcast des poids sur les workers Spark
sc = spark.sparkContext  # Récupère le SparkContext depuis la SparkSession
weights = model.get_weights()  # Récupère les poids du modèle
broadcast_weights = sc.broadcast(weights)  # Diffuse les poids à tous les workers Spark


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [9]:
# Fonction pour reconstruire le modèle dans chaque partition Spark
def get_model():
    m = MobileNetV2(weights=None, include_top=True, input_shape=(224,224,3))  # Crée un modèle MobileNetV2 sans poids
    m = Model(inputs=m.input, outputs=m.layers[-2].output)  # Enlève la dernière couche de classification
    m.set_weights(broadcast_weights.value)  # Charge les poids diffusés dans le modèle
    return m  # Retourne le modèle prêt à l’emploi


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# Installation du package OpenCV en version "headless" (sans interface graphique),
# nécessaire pour traiter les images dans l'environnement EMR via PySpark.
# La version est fixée à 4.7.0.72 pour éviter les incompatibilités.
sc.install_pypi_package("opencv-python-headless==4.7.0.72")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Collecting opencv-python-headless==4.7.0.72
  Downloading opencv_python_headless-4.7.0.72-cp37-abi3-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (49.2 MB)
Installing collected packages: opencv-python-headless
Successfully installed opencv-python-headless-4.7.0.72


In [11]:
# Imports nécessaires

# NumPy est utilisé pour manipuler efficacement des tableaux numériques (ex. : conversion d'image en array)
import numpy as np

# OpenCV (version headless) est une bibliothèque de vision par ordinateur utilisée ici pour charger et redimensionner les images
import cv2

# io permet de gérer des flux d'entrée/sortie en mémoire (utile pour manipuler les données binaires d'image, comme depuis un fichier S3)
import io

# Fonction de prétraitement de MobileNetV2 : elle applique les transformations nécessaires pour que les images soient compatibles avec le modèle
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input

# Convertit une image en tableau (array) exploitable par TensorFlow
from tensorflow.keras.preprocessing.image import img_to_array

# Permet de définir une fonction personnalisée (UDF) à appliquer sur les colonnes d'un DataFrame Spark
from pyspark.sql.functions import udf

# Types Spark utilisés pour définir le type de retour de l’UDF : ici, un tableau de float (ArrayType(FloatType()))
from pyspark.sql.types import ArrayType, FloatType


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# Fonction pour reconstruire le modèle MobileNetV2 avec les poids diffusés (broadcastés) par Spark
def get_model():

    # Vérifie que les poids ont bien été diffusés. Si ce n'est pas le cas, une erreur est levée pour éviter un modèle vide ou incorrect.
    if broadcast_weights.value is None:
        raise ValueError("Les poids du modèle n'ont pas été diffusés correctement.")

    # Reconstruit l'architecture MobileNetV2 sans les poids pré-entraînés (weights=None),
    # sans la couche de classification finale (include_top=False), avec une couche de pooling global (avg).
    m = MobileNetV2(weights=None, include_top=False, pooling="avg", input_shape=(224, 224, 3))

    # Crée un modèle Keras en prenant l'entrée de MobileNetV2,
    # mais en sortant la couche juste avant la dernière (pour obtenir les embeddings, pas la classification)
    model = Model(inputs=m.input, outputs=m.layers[-2].output)

    # Applique les poids diffusés par Spark au modèle reconstruit
    model.set_weights(broadcast_weights.value)

    # Retourne le modèle prêt à faire de l’inférence
    return model


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# Fonction pour extraire les embeddings d'une image (vecteur numérique décrivant son contenu)
def extract_embedding(content):
    try:
        # Conversion des octets bruts de l'image en tableau numpy (tableau de uint8)
        nparr = np.frombuffer(content, np.uint8)

        # Décodage du tableau en image OpenCV (BGR)
        img = cv2.imdecode(nparr, cv2.IMREAD_COLOR)

        # Redimensionnement de l'image à 224x224 pixels pour MobileNetV2
        img = cv2.resize(img, (224, 224))

        # Conversion de l'image en tableau compatible avec Keras
        x = img_to_array(img)

        # Ajout d'une dimension pour simuler un batch de taille 1 (forme : (1, 224, 224, 3))
        x = np.expand_dims(x, axis=0)

        # Prétraitement selon les attentes de MobileNetV2 (normalisation, mise à l'échelle, etc.)
        x = preprocess_input(x)

        # Récupération du modèle reconstruit avec les poids diffusés par Spark
        model = get_model()

        # Prédiction (extraction des features) avec le modèle, sortie de forme (1, 1280)
        features = model.predict(x)

        # Aplatit le vecteur (supprime la dimension batch) et convertit en liste Python
        return features.flatten().tolist()

    except Exception:
        # Si une erreur survient (image corrompue, format invalide…), retourne un vecteur neutre de 1280 zéros
        return [0.0] * 1280  # MobileNetV2 produit un vecteur de 1280 dimensions


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
# Déclaration d'une fonction UDF (User Defined Function) pour Spark
# Cela permet d'exécuter la fonction extract_embedding sur chaque ligne d'une DataFrame Spark
# Elle renvoie un tableau (ArrayType) de float (FloatType), ce qui correspond au vecteur d'embedding extrait de l'image
extract_embedding_udf = udf(extract_embedding, ArrayType(FloatType()))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
# Application de la fonction UDF 'extract_embedding_udf' à chaque ligne de la colonne "content"
# "content" contient ici les données binaires (brutes) des images lues depuis S3
# La fonction 'extract_embedding_udf' va extraire un vecteur d'embedding (représentation numérique de l'image)
# Une nouvelle colonne "embedding" est ajoutée au DataFrame avec le résultat
df_emb = images.withColumn("embedding", extract_embedding_udf("content"))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# On vérifie que l'on a bien extrait les vecteurs de caractéristiques (features)
# pour toutes les images lues depuis le stockage S3.
# La fonction count() retourne le nombre total de lignes dans le DataFrame df_emb,
# chaque ligne correspondant à une image avec son embedding calculé.
print("Nombre d'images avec features extraites :", df_emb.count())


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Nombre d'images avec features extraites : 12

In [17]:
# On sélectionne uniquement les colonnes "path" (chemin de l'image sur S3)
# et "embedding" (vecteur de caractéristiques extrait par le modèle)
# On affiche les 5 premières lignes sans troncature pour voir les embeddings en entier.
df_emb.select("path", "embedding").show(5, truncate=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

#### 4.10.5.4 — Extraction distribuée de caractéristiques visuelles à l’aide de MobileNetV2 sur Spark

Dans cette partie, nous avons réalisé l’extraction distribuée de caractéristiques visuelles à partir d’images en utilisant le modèle MobileNetV2 pré-entraîné.
Pour cela, nous avons :

1. Supprimé la couche de classification finale du modèle pour ne garder que les features ;

2. Diffusé les poids du modèle à l’ensemble des workers Spark via un mécanisme de broadcast ;

3. Appliqué le modèle à chaque image de manière distribuée en définissant une UDF vectorisée dans PySpark.

4. Cette approche permet d’extraire automatiquement les vecteurs de caractéristiques des images à grande échelle, en tirant pleinement parti des capacités parallèles de Spark.

In [18]:
# Optimisation de la taille des batches envoyés aux fonctions UDF via Arrow
# Cela permet de réduire la surcharge de transfert entre Spark et les fonctions Python (pandas UDF)
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
def extract_embedding(content):
    try:
        # Ouverture du contenu binaire de l'image en format RGB
        img = Image.open(io.BytesIO(content)).convert("RGB")

        # Redimensionnement de l'image à 224x224 pixels (taille attendue par MobileNetV2)
        img = img.resize((224, 224))

        # Conversion de l'image en tableau numpy
        x = image.img_to_array(img)

        # Ajout d'une dimension pour simuler un batch (1, 224, 224, 3)
        x = np.expand_dims(x, axis=0)

        # Prétraitement spécifique à MobileNetV2 (mise à l’échelle, centrage)
        x = preprocess_input(x)

        # Création locale du modèle MobileNetV2 sans couche finale
        model = MobileNetV2(weights=None, include_top=False, pooling='avg', input_shape=(224, 224, 3))

        # Injection des poids préalablement diffusés via Spark (broadcast)
        model.set_weights(broadcast_weights.value)

        # Prédiction : on obtient les embeddings de l'image
        features = model.predict(x)

        # Retourne les features sous forme de liste de float
        return features.flatten().tolist()

    except Exception:
        # En cas d'erreur, on retourne un vecteur vide pour éviter que le pipeline plante
        return []


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Réglage de l’algorithme de commit de fichiers en sortie sous Hadoop
# Version 2 est plus efficace avec Amazon EMR pour éviter les problèmes de surcharge I/O
spark.conf.set("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Sauvegarde du DataFrame df_emb au format Parquet dans le dossier spécifié sur S3
# mode("overwrite") écrase les fichiers existants à cet emplacement
# Le format Parquet est un format colonne compressé, idéal pour le stockage et les traitements distribués avec Spark.
# Il est très utilisé dans les architectures Big Data car il est à la fois efficace et compatible avec de nombreux outils (Athena, EMR, etc.).
df_emb.write.mode("overwrite").parquet(PATH_Result_S3)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# Affiche le schéma (structure) du DataFrame Spark : noms, types et nullabilité des colonnes
df_emb.printSchema()

# Affiche les deux premières lignes de la colonne "embedding" sans tronquer les données
df_emb.select("embedding").show(2, truncate=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)
 |-- embedding: array (nullable = true)
 |    |-- element: float (containsNull = true)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------


- **Contrôle de la colonne embedding** :
1. Nous avons vérifié que la colonne embedding, contenant les vecteurs de caractéristiques extraits des images, a bien été générée.

- **Validation du schéma Spark** :
2. Le schéma du DataFrame est conforme : embedding est bien un tableau de float, ce qui est attendu pour une future étape de classification ou d’indexation.

- **Contrôle qualité visuel** :
3. L’affichage de quelques vecteurs permet de s’assurer que les embeddings sont bien structurés, non vides, et prêts à être exportés pour un usage ultérieur


### PCA

In [23]:
# Imports nécessaires pour la réduction de dimension et la vectorisation

from pyspark.ml.feature import PCA, VectorAssembler
# PCA : pour appliquer l'algorithme de réduction de dimension
# VectorAssembler : pour assembler les features en un seul vecteur utilisable par Spark ML

from pyspark.ml.linalg import Vectors, DenseVector
# Vectors, DenseVector : types spécifiques utilisés par Spark ML pour manipuler des vecteurs numériques

from pyspark.sql.functions import udf
# udf : permet de créer des fonctions personnalisées appliquées sur les DataFrames (UDF = User Defined Function)

from pyspark.sql.types import ArrayType, FloatType
# Types utilisés pour spécifier la structure des colonnes manipulées dans les UDF (tableaux de float ici)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# Chargement des features extraites précédemment depuis le stockage S3
features_df = spark.read.parquet(PATH_Result_S3)
# Lecture du fichier Parquet contenant les vecteurs d'embeddings extraits des images

# Renommer la colonne 'embedding' en 'features' pour compatibilité avec Spark ML
features_df = features_df.withColumnRenamed("embedding", "features")
# Spark ML attend une colonne nommée 'features' pour utiliser VectorAssembler ou PCA

# Vérification du contenu chargé
print("Nombre d'images avec features extraites:", features_df.count())
# Affiche le nombre total d'images traitées

features_df.select("path", "features").show(5, truncate=False)
# Affiche un échantillon (5 lignes) avec le chemin des images et leurs vecteurs de caractéristiques


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Nombre d'images avec features extraites: 12
+-------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [25]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, FloatType

# UDF (User Defined Function) pour tronquer les vecteurs trop longs (par exemple ici à 200 dimensions max)
def truncate_vector(arr, max_len=200):
    return arr[:max_len] if arr else []

# Déclaration de la fonction truncate_vector comme UDF utilisable dans PySpark
truncate_udf = udf(lambda x: truncate_vector(x), ArrayType(FloatType()))

# Ajout d'une nouvelle colonne "features_truncated" avec les vecteurs réduits
features_df = features_df.withColumn("features_truncated", truncate_udf("features"))


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import col, udf
from pyspark.ml.feature import PCA

# UDF pour convertir une liste Python (array) en type DenseVector requis par Spark ML
def to_vector(arr):
    return Vectors.dense(arr) if arr else Vectors.dense([])

# Déclaration comme UDF
to_vector_udf = udf(to_vector, VectorUDT())

# Conversion de la colonne "features_truncated" en colonne "features_vec" de type DenseVector
features_df_vec = features_df.withColumn("features_vec", to_vector_udf(col("features_truncated")))

# Application de la PCA pour réduire les vecteurs à 20 dimensions
pca = PCA(k=20, inputCol="features_vec", outputCol="features_pca")  # k=20 => on garde les 20 composantes principales
pca_model = pca.fit(features_df_vec)  # Entraînement du modèle PCA
pca_result_df = pca_model.transform(features_df_vec)  # Application du modèle PCA

# Affichage des deux premières lignes pour visualiser les vecteurs PCA
pca_result_df.select("features_pca").show(2, truncate=False)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|features_pca                                                                                                                                                                                                                                                                                                                                                                                                          |
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [42]:
# Affichage texte tabulaire de la variance expliquée
print("Composante | Var. expliquée (%) | Var. cumulée (%)")
print("-----------------------------------------------")
for i, (v, c) in enumerate(zip(explained_variance, cumulative_variance)):
    print(f"{i+1:^10} | {v*100:>17.2f} | {c*100:>16.2f}")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Composante | Var. expliqu?e (%) | Var. cumul?e (%)
-----------------------------------------------
    1      |             66.15 |            66.15
    2      |             19.35 |            85.50
    3      |              5.26 |            90.76
    4      |              3.17 |            93.94
    5      |              2.22 |            96.15
    6      |              2.09 |            98.24
    7      |              1.13 |            99.37
    8      |              0.31 |            99.68
    9      |              0.20 |            99.88
    10     |              0.10 |            99.98
    11     |              0.02 |           100.00
    12     |              0.00 |           100.00
    13     |              0.00 |           100.00
    14     |              0.00 |           100.00
    15     |              0.00 |           100.00
    16     |              0.00 |           100.00
    17     |              0.00 |           100.00
    18     |              0.00 |           100.00
 

Les 3 premières composantes principales expliquent plus de 90 % de la variance totale, ce qui indique qu'une réduction de dimension à 3 dimensions est suffisante pour capturer l’essentiel de l'information.

In [39]:
# Mini graphe ASCII de l’éboulis
for i, val in enumerate(cumulative_variance):
    bar = "#" * int(val * 50)  # échelle visuelle
    print(f"Composante {i+1:2} : {bar} ({val:.2%})")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Composante  1 : ################################# (66.15%)
Composante  2 : ########################################## (85.50%)
Composante  3 : ############################################# (90.76%)
Composante  4 : ############################################## (93.94%)
Composante  5 : ################################################ (96.15%)
Composante  6 : ################################################# (98.24%)
Composante  7 : ################################################# (99.37%)
Composante  8 : ################################################# (99.68%)
Composante  9 : ################################################# (99.88%)
Composante 10 : ################################################# (99.98%)
Composante 11 : ################################################# (100.00%)
Composante 12 : ################################################# (100.00%)
Composante 13 : ################################################# (100.00%)
Composante 14 : #####################################

In [38]:
# Spécifie le chemin de destination dans S3
variance_output_path = PATH_Result_S3 + "_explained_variance"

# Sauvegarde du tableau dans S3 au format Parquet
variance_df_spark.write.mode("overwrite").parquet(variance_output_path)


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [40]:
# Sauvegarde des résultats (vecteurs réduits via PCA) dans S3 au format Parquet
pca_result_df.write.mode("overwrite").parquet(PATH_Result_S3 + "_pca")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1. On tronque les vecteurs trop longs (ex. MobileNetV2 sort 1280 valeurs → on garde 200).

2. On transforme les vecteurs en format DenseVector utilisable par Spark ML.

3. On applique PCA pour passer à une représentation réduite (20 dimensions).

4. On sauvegarde les résultats dans S3 au format Parquet.

- L’analyse en composantes principales (PCA) a été utilisée pour réduire la dimension des vecteurs d’images extraits, tout en conservant l’essentiel de l'information visuelle.
- Dans le cadre de cette mission, cette réduction permet d’optimiser les performances du moteur de classification d’images de fruits, essentiel à la première version de l’application mobile.
- Les résultats montrent que les 6 premières composantes suffisent à expliquer plus de 98 % de la variance, ce qui valide l’intérêt de cette compression pour accélérer les traitements sur une infrastructure Big Data.

### 4.10.6 Chargement des données enregistrées et validation du résultat

In [None]:
import pandas as pd
import numpy as np

# 1. Chemins vers les résultats sauvegardés sur S3
PATH_Result_S3 = "s3://aws-emr-studio-065693560228-eu-west-3/Results/"
PATH_PCA_S3 = PATH_Result_S3 + "_pca"

# 2. Lecture des features extraites
print("VALIDATION DES FEATURES EXTRAIRES ")
try:
    df = pd.read_parquet(PATH_Result_S3, engine="pyarrow")
    print("Chargement réussi des features")
    print(f"Nombre d'images traitées : {len(df)}")
    print("Colonnes :", df.columns.tolist())
    print(df.head(2))

    # Vérification que la colonne 'embedding' existe
    if "embedding" in df.columns:
        emb = df["embedding"].iloc[0]
        print(f"Type de vecteur : {type(emb)}, taille : {len(emb)}")
    else:
        print("Colonne 'embedding' manquante !")

except Exception as e:
    print(f"Erreur de chargement des features : {e}")


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

=== VALIDATION DES FEATURES EXTRAIRES ===
Chargement r?ussi des features
Nombre d'images trait?es : 12
Colonnes : ['path', 'modificationTime', 'length', 'content', 'label', 'embedding']
                                                path  ...                                          embedding
0  s3://aws-emr-studio-065693560228-eu-west-3/Tes...  ...  [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...
1  s3://aws-emr-studio-065693560228-eu-west-3/Tes...  ...  [0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, ...

[2 rows x 6 columns]
Type de vecteur : <class 'numpy.ndarray'>, taille : 62720

In [None]:
# Nous arrêtons le serveur Spark pour pouvoir consulter les logs dans l’interface Spark History Server.
spark.stop()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Lors de l'exécution sur EMR, nous avons surveillé la progression des jobs Spark via l'interface Spark History Server (onglet “Monitoring”) accessible depuis la console AWS.

Nous avons également utilisé la console EMR pour consulter :
- Les étapes d'exécution (onglet “Steps”) ;
- Les journaux systèmes (logs stdout/stderr) ;
- Le statut détaillé des jobs (temps d'exécution, partitions, workers, etc.).

Cette étape a permis un **debug efficace**, un **suivi de performance**, et une meilleure **compréhension du parallélisme** dans Spark.


---

### Résiliation EMR

Nous avons procédé à la résiliation du cluster via la **console AWS**, car les commandes `aws emr` ne peuvent pas être exécutées directement depuis un notebook PySpark.


- Les résultats finaux sont correctement stockés sur Amazon S3 :
  1. Les images sources sont classées par catégories dans des dossiers distincts.
  2. Les embeddings générés avec MobileNetV2 sont sauvegardés au format Parquet dans le répertoire `Results/`.
  3. La réduction de dimension par PCA est également enregistrée dans un sous-dossier `Results/_pca/`.
  4. Ces éléments valident la bonne exécution de l’ensemble du pipeline sur le cluster EMR.


**FIN**