# 🧭 Projet Fruits! – Traitement Big Data d'images en local (avec PySpark + Transfert Learning)

## 1. Préambule

### 1.1 Objectif global du projet

La start-up fictive **Fruits!**, active dans le secteur de l’AgriTech, souhaite développer une application mobile capable de reconnaître automatiquement des fruits à partir d’une photo prise par un utilisateur.

L’objectif de cette première phase est de construire une **chaîne de traitement Big Data** capable :
- de charger les images,
- d’extraire automatiquement des vecteurs de caractéristiques via **transfert learning**,
- de les sauvegarder dans un format structuré pour des traitements en aval (PCA, clustering, etc.).

---

### 1.2 Objectif de ce notebook

Ce notebook a pour but de **valider la chaîne de traitement en local** :
- en **utilisant PySpark** pour simuler un environnement distribué,
- en **appliquant un modèle MobileNetV2** via **transfert learning** pour extraire les features,
- en **utilisant toutes les étiquettes** présentes dans le dossier `fruits-360_dataset/fruits-360/Training`,
- et en **sauvegardant les résultats** au format `parquet`.

Cette version locale nous permettra ensuite de passer à une version **cloud distribuée (AWS EMR + S3)**.

---

### 1.3 Structure du notebook

Ce projet se structure comme suit :

1. **Préambule** : objectif, contexte et plan.
2. **Environnement de travail** : imports, paths, SparkSession.
3. **Chargement des données** : lecture binaire des images et extraction des labels.
4. **Préparation du modèle MobileNetV2** (transfert learning).
5. **Extraction des vecteurs de caractéristiques avec Pandas UDF**.
6. **Sauvegarde des résultats au format `parquet`**.
7. **Réduction de dimension + visualisation**.
8. **Conclusion** et perspective cloud.

---

## 2. Environnement de travail

### 2.1 Imports des librairies

Nous importons ici les bibliothèques nécessaires à notre projet local.  
Ces bibliothèques sont compatibles avec un futur déploiement sur cluster Spark via AWS EMR.

In [1]:
import os
import io
import pandas as pd
import numpy as np
from PIL import Image

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

2025-05-26 01:01:44.405390: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-05-26 01:01:44.440256: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-05-26 01:01:44.476355: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1748214104.538502   24102 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1748214104.553792   24102 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1748214104.650408   24102 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linkin

### 2.2 Définition des chemins d’accès

Dans cette version locale, nous utilisons un extrait du dataset officiel Fruits 360, téléchargé et placé dans le dossier fruits-360_dataset/fruits-360/Training.

Les images seront chargées depuis ce répertoire, et les résultats seront enregistrés localement dans un dossier Results_Local.

> 🧠 Le dataset contient des sous-dossiers par classe de fruit (ex : Apple Braeburn, Banana, etc.), ce qui permet d’extraire automatiquement les labels à partir du chemin d’accès.

> 💡 Adaptation WSL : Le disque C: de Windows est monté sous /mnt/c/ dans WSL

In [2]:
# Chemins pour WSL (converti depuis C:/...)
PATH_ROOT = "/mnt/c/Users/inesn/OneDrive - Université de Paris/projet8/fruits-360_dataset/fruits-360"
PATH_DATA = os.path.join(PATH_ROOT, "Training")
PATH_RESULTS = os.path.join(PATH_ROOT, "Results_Local")

# Vérification de l'existence du dossier de données
assert os.path.exists(PATH_DATA), f"❌ Le dossier n'existe pas : {PATH_DATA}"

print("📂 Dossier de données :", PATH_DATA)
print("📁 Dossier de sortie :", PATH_RESULTS)

📂 Dossier de données : /mnt/c/Users/inesn/OneDrive - Université de Paris/projet8/fruits-360_dataset/fruits-360/Training
📁 Dossier de sortie : /mnt/c/Users/inesn/OneDrive - Université de Paris/projet8/fruits-360_dataset/fruits-360/Results_Local


In [3]:
import os

# Dossier contenant les classes
label_folders = os.listdir(PATH_DATA)

# Filtrer uniquement les dossiers (au cas où il y aurait des fichiers)
label_folders = [f for f in label_folders if os.path.isdir(os.path.join(PATH_DATA, f))]

# Affichage
print(f"🍇 Nombre de classes de fruits : {len(label_folders)}")
print("📁 Exemples de classes :", label_folders[:10])

🍇 Nombre de classes de fruits : 131
📁 Exemples de classes : ['Apple Braeburn', 'Apple Crimson Snow', 'Apple Golden 1', 'Apple Golden 2', 'Apple Golden 3', 'Apple Granny Smith', 'Apple Pink Lady', 'Apple Red 1', 'Apple Red 2', 'Apple Red 3']


### 2.3 Création de la SparkSession

Nous créons une session Spark nommée `Fruits_Local`.  
Dans ce mode, Spark est exécuté **en local** mais permet déjà de simuler un environnement distribué.

In [4]:
spark = (
    SparkSession.builder
    .appName("Fruits_Local")
    .master("local[*]")  # Utilise tous les cœurs disponibles
    .config("spark.sql.parquet.writeLegacyFormat", "true")  # Pour compatibilité des formats parquet
    .getOrCreate()
)

sc = spark.sparkContext
print("✅ SparkSession initialisée :", spark.version)

25/05/26 01:01:52 WARN Utils: Your hostname, SUPER-INES resolves to a loopback address: 127.0.1.1; using 172.20.235.203 instead (on interface eth0)
25/05/26 01:01:52 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/26 01:01:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/05/26 01:01:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


✅ SparkSession initialisée : 3.4.1


## 3. Chargement et préparation des données
### 3.1 Chargement des images

Nous chargeons les images au format binaire à partir du répertoire Training du dataset Fruits 360.

Spark lit l’ensemble des fichiers .jpg de manière récursive dans les sous-dossiers, où chaque sous-dossier représente une classe de fruit (label).


In [5]:
from pyspark.sql.functions import col, element_at, split

# Charger toutes les images
images = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.jpg") \
    .option("recursiveFileLookup", "true") \
    .load(f"file://{PATH_DATA}")

images.printSchema()
images.show(5)

                                                                                

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)



                                                                                

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|file:/mnt/c/Users...|2021-09-12 19:27:00|  7437|[FF D8 FF E0 00 1...|
|file:/mnt/c/Users...|2021-09-12 19:27:00|  7434|[FF D8 FF E0 00 1...|
|file:/mnt/c/Users...|2021-09-12 19:26:52|  7424|[FF D8 FF E0 00 1...|
|file:/mnt/c/Users...|2021-09-12 19:27:00|  7423|[FF D8 FF E0 00 1...|
|file:/mnt/c/Users...|2021-09-12 19:27:00|  7416|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows



### 3.2 Ajout des labels

Nous extrayons le nom du dossier parent (qui correspond à la classe de fruit) pour l’ajouter en tant que label à chaque image.

In [6]:
# Extraire les labels à partir du chemin
images = images.withColumn(
    "label",
    element_at(split(col("path"), "/"), -2)
)

# Afficher le nombre de classes
labels = images.select("label").distinct()
print("🍒 Nombre de classes :", labels.count())

N_IMAGES_PER_CLASS = 5

# Sélectionner 5 images par classe
sampled_df = images.groupBy("label").applyInPandas(
    lambda pdf: pdf.sample(n=min(N_IMAGES_PER_CLASS, len(pdf)), random_state=42),
    schema=images.schema
)



🍒 Nombre de classes : 131


                                                                                

## 4. Préparation du modèle MobileNetV2 (transfert learning)
### 4.1 Chargement du modèle MobileNetV2

Nous appliquons ici la technique du transfert learning, en important le modèle MobileNetV2 pré-entraîné sur ImageNet.
Nous supprimons la dernière couche (Softmax) pour ne conserver que le vecteur de caractéristiques de dimension (1, 1, 1280).

In [7]:
# Chargement du modèle MobileNetV2 pré-entraîné
model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3))

# Suppression de la couche de classification finale (Softmax)
new_model = Model(inputs=model.input, outputs=model.layers[-2].output)

# Résumé du modèle pour vérifier la sortie attendue (1280)
new_model.summary()

E0000 00:00:1748215141.539632   24102 cuda_executor.cc:1228] INTERNAL: CUDA Runtime error: Failed call to cudaGetRuntimeVersion: Error loading CUDA libraries. GPU will not be used.: Error loading CUDA libraries. GPU will not be used.
W0000 00:00:1748215141.540830   24102 gpu_device.cc:2341] Cannot dlopen some GPU libraries. Please make sure the missing libraries mentioned above are installed properly if you would like to use GPU. Follow the guide at https://www.tensorflow.org/install/gpu for how to download and setup the required libraries for your platform.
Skipping registering GPU devices...


### 4.2 Broadcast des poids pour Spark

Les poids du modèle sont chargés une seule fois sur le driver, puis diffusés à l’ensemble des workers avec SparkContext.broadcast.

In [8]:
# Diffusion des poids vers les workers
broadcast_weights = sc.broadcast(new_model.get_weights())

### 4.3 Définition d’une fonction model_fn()

Cette fonction est appelée dans les pandas UDF pour recharger le modèle dans chaque worker Spark.

In [9]:
def model_fn():
    """
    Retourne une instance de MobileNetV2 sans la couche finale,
    avec les poids broadcastés rechargés.
    """
    model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input, outputs=model.layers[-2].output)
    new_model.set_weights(broadcast_weights.value)
    return new_model

## 5. Extraction des vecteurs de caractéristiques avec Pandas UDF
### 5.1 Prétraitement des images (redimensionnement + normalisation)

Avant de les passer dans MobileNetV2, les images doivent :

- être redimensionnées en 224x224 pixels,

- converties en tableau NumPy,

- normalisées avec preprocess_input().

In [10]:
def preprocess(content):
    """
    Convertit une image binaire en tableau NumPy normalisé pour MobileNetV2.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

### 5.2 Application du modèle sur une série d’images

On applique le modèle à une série d’images pour en extraire un vecteur par image.

In [11]:
def featurize_series(model, content_series):
    """
    Applique MobileNetV2 à une série d’images brutes.
    Retourne une série de vecteurs de caractéristiques.
    """
    input_array = np.stack(content_series.map(preprocess))
    preds = model.predict(input_array)
    # Aplatir les tenseurs pour les stocker proprement dans un DataFrame Spark
    return pd.Series([p.flatten() for p in preds])

### 5.3 Définition d’un pandas_udf Spark

Nous utilisons ici un UDF avec itérateur (SCALAR_ITER) qui permet de :

- ne charger le modèle qu’une seule fois par partition Spark,

- appliquer le modèle à des lots d’images.

In [12]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    """
    Pandas UDF distribué pour transformer les images en vecteurs.
    """
    model = model_fn()  # Charge MobileNetV2 sans la dernière couche
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



### 5.4 Application du UDF sur les images

On applique maintenant ce UDF à notre DataFrame Spark pour créer un DataFrame features_df contenant :

- le chemin de l’image,

- son label (classe de fruit),

- son vecteur de caractéristiques (1280 valeurs).

In [13]:
features_df = sampled_df.repartition(2).select(
    col("path"),
    col("label"),
    featurize_udf("content").alias("features")
)

In [14]:
features_df.show(3, truncate=False)

2025-05-26 01:46:13.177646: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-05-26 01:46:13.182487: I external/local_xla/xla/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2025-05-26 01:46:13.193281: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:467] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
E0000 00:00:1748216773.214370   25992 cuda_dnn.cc:8579] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
E0000 00:00:1748216773.220703   25992 cuda_blas.cc:1407] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered
W0000 00:00:1748216773.277563   25992 computation_placer.cc:177] computation placer already registered. Please check linkage and avoid linkin

+-------------------------------------------------------------------------------------------------------------------------------+------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m2s[0m 2s/step
                                                                                

## 6. Sauvegarde des résultats au format parquet

Après avoir généré les vecteurs de caractéristiques avec MobileNetV2 pour chaque image, nous enregistrons le DataFrame features_df dans un fichier au format Parquet. Ce format est optimisé pour le stockage structuré et est compatible avec les traitements distribués en aval (ex : PCA, clustering, etc.).

💾 Le fichier sera écrit dans le dossier défini plus tôt : PATH_RESULTS.

In [15]:
# 6.1 Paramètre de sauvegarde
SAVE_PARQUET = False 

# 6.2 Affichage du chemin cible
print("📁 Enregistrement dans :", PATH_RESULTS)

# 6.3 Sauvegarde conditionnelle
if SAVE_PARQUET:
    # Enregistrement du DataFrame Spark au format Parquet
    print("📦 Enregistrement du fichier Parquet...")
    features_df.write.mode("overwrite").parquet(PATH_RESULTS)
    print("✅ Fichier Parquet écrit avec succès.")
else:
    print("⚠️ Sauvegarde désactivée — aucun fichier écrit.")


📁 Enregistrement dans : /mnt/c/Users/inesn/OneDrive - Université de Paris/projet8/fruits-360_dataset/fruits-360/Results_Local
⚠️ Sauvegarde désactivée — aucun fichier écrit.


## 7. Réduction de dimension et visualisation (ACP / PCA)
### 7.1 Chargement des résultats featurisés

Nous rechargeons les données sauvegardées au format parquet contenant les vecteurs de caractéristiques extraits avec MobileNetV2.

In [16]:
#df = pd.read_parquet(PATH_RESULTS, engine="pyarrow")
#print("✅ Données chargées :", df.shape)
#df.head()

### 7.2 Conversion des vecteurs pour l’ACP

Les vecteurs sont actuellement des listes de 1280 floats stockées dans une colonne. Nous les convertissons en une matrice X utilisable pour la réduction de dimension.

In [None]:
df = features_df.toPandas()  # conversion Spark → Pandas
X = np.array(df["features"].tolist())

[Stage 14:=>                                                    (40 + 8) / 2116]

### 7.3 Réduction de dimension avec PCA

Nous appliquons une Analyse en Composantes Principales (PCA) pour réduire la dimension des vecteurs tout en conservant un maximum de variance.

In [None]:
from sklearn.decomposition import PCA

# Réduction à 2 dimensions pour visualisation
pca = PCA(n_components=2)
X_pca = pca.fit_transform(X)

# Ajout au DataFrame
df["pca1"] = X_pca[:, 0]
df["pca2"] = X_pca[:, 1]


### 7.4 Visualisation 2D des fruits

In [None]:
pip install matplotlib

In [None]:
pip install seaborn

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns

plt.figure(figsize=(12, 8))
sns.scatterplot(
    x="pca1", y="pca2", hue="label", data=features_df, palette="tab10", s=60, alpha=0.7, edgecolor="k"
)
plt.title("Projection PCA (2D) des fruits featurisés par MobileNetV2", fontsize=14)
plt.xlabel("Composante principale 1")
plt.ylabel("Composante principale 2")
plt.legend(bbox_to_anchor=(1.05, 1), loc='upper left', borderaxespad=0.)
plt.tight_layout()
plt.show()
