<img src="https://raw.githubusercontent.com/Sengsathit/OCR_data_scientist_assets/main/header_fruits.png" alt="Alternative text" />

# Introduction

Ce notebook a pour objectif la mise en place d'une chaîne de traitement dans un environnement Big Data, en s'appuyant sur PySpark et une architecture cloud AWS EMR. Le projet vise à compléter les travaux d'un alternant en développant des briques essentielles pour la gestion de données volumineuses liées à la classification d'images de fruits.

L'accent sera mis sur la scalabilité des traitements et la conformité au RGPD en veillant à l'utilisation de serveurs européens.

# Import des librairies

In [None]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql import Row
from pyspark.ml.feature import PCA as SparkPCA
from pyspark.ml.linalg import Vectors, VectorUDT, Vector

# Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons à nos données sur **S3** comme si elles étaient stockées localement, en définissant les chemins comme suit :

In [None]:
PATH = 's3://ocr-p9'                        # l'URI du bucket du projet
PATH_data = PATH+'/data'                    # l'URI du répertoire des images à traiter
PATH_results = PATH+'/results'              # l'URI des features extraites des images et sauvegardées au format parquet
print(f"PATH :\t\t{PATH}")
print(f"PATH_data :\t{PATH_data}")
print(f"PATH_results :\t{PATH_results}")

# Traitement des données

## Chargement des données

Nous utilisons Spark pour lire des fichiers binaires (comme des images) et les convertir en un format compatible avec le traitement distribué à grande échelle. 

Cela produit un DataFrame Spark contenant à la fois les métadonnées des fichiers et leur contenu binaire, permettant ainsi l'ingestion de grandes quantités d'images dans un environnement distribué.

In [None]:
# Lecture des images
images = ( spark.read.format("binaryFile")

  # Filtre pour ne lire que les fichiers avec l'extension .jpg
  .option("pathGlobFilter", "*.jpg")

  # Recherche récursive dans les sous-dossiers
  .option("recursiveFileLookup", "true")

  # Charge les fichiers à partir du S3
  .load(PATH_data)
)

In [None]:
images.show(5)

In [None]:
# Ajoute une nouvelle colonne 'label' au DataFrame 'images'.
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))

print(images.printSchema())
print(images.select('path','label').show(5,False))

## Préparation du modèle

Nous préparons un modèle de deep learning en utilisant l'architecture MobileNetV2 pré-entraînée sur le dataset ImageNet.

Nous allons utiliser ce modèle pour extraire des caractéristiques (features) des images, plutôt que de les classer. Le modèle prendra en entrée des images de taille 224x224 pixels et produira des features issues de l'avant-dernière couche du réseau. Ces caractéristiques pourront ensuite être utilisées pour d'autres tâches, comme la classification personnalisée sur un jeu de données spécifique ou d'autres types de traitement.

L'objectif ici est donc d'exploiter les capacités du modèle pré-entraîné pour obtenir des représentations d'images riches, sans utiliser la couche de classification finale dédiée aux 1000 classes d'ImageNet.

In [None]:
# Initialisation du modèle MobileNetV2 avec des poids pré-entraînés sur ImageNet
model = MobileNetV2(
    weights='imagenet',         # Chargement des poids pré-entraînés sur le dataset ImageNet.
    include_top=True,           # Inclut la couche fully-connected de sortie utilisée pour la classification sur 1000 classes.
    input_shape=(224, 224, 3)   # Définit la taille d'entrée des images en 224x224 pixels avec 3 canaux (images en couleur).
)


In [None]:
# Création d'un nouveau modèle basé sur MobileNetV2 en modifiant la sortie du modèle
new_model = Model(
    inputs=model.input,                 # On conserve les mêmes entrées que le modèle MobileNetV2 d'origine.
    outputs=model.layers[-2].output     # La sortie est modifiée pour être la sortie de l'avant-dernière couche (au lieu de la dernière couche de classification).
)  


In [None]:
# Diffusion des poids du modèle MobileNetV2 dans l'environnement Spark
model_weights = sc.broadcast(new_model.get_weights())

In [None]:
new_model.summary()

In [None]:
def get_weighted_pretrained_model():
    """
    Crée et retourne un modèle MobileNetV2 avec la couche fully-connected finale retirée.
    
    Utilisation dans un environnement Spark distribué :
    
    - Sur le driver (master) : Les poids du modèle sont initialisés puis diffusés (broadcast) à tous les workers.
    - Sur chaque worker (node) : Cette fonction instancie une copie locale du modèle MobileNetV2, applique les poids diffusés depuis le master (grâce à l'objet broadcast), et prépare le modèle pour traiter des images à l'échelle distribuée.
    
    Le modèle est configuré pour l'extraction de features (sans réentraînement) car toutes les couches sont gelées.

    Returns:
        new_model: Un modèle MobileNetV2 modifié sans la couche de classification finale et avec les poids pré-entraînés appliqués.
    """

    # Crée le modèle MobileNetV2 avec les poids pré-entraînés sur ImageNet
    model = MobileNetV2(
        weights='imagenet',
        include_top=True,           # Inclut la couche fully-connected originale (que nous allons retirer ensuite)
        input_shape=(224, 224, 3)   # Taille d'entrée fixée à 224x224 avec 3 canaux
    )
    
    # Marquer toutes les couches du modèle comme non entraînables afin d'empêcher la mise à jour des poids des couches lors de l'entraînement ou de l'utilisation du modèle
    for layer in model.layers:
        layer.trainable = False
    
    # Créer un nouveau modèle avec l'avant-dernière couche comme sortie (pour extraire des features)
    new_model = Model(
        inputs=model.input,
        outputs=model.layers[-2].output  # Retrait de la dernière couche de classification
    )
    
    # Appliquer les poids diffusés (broadcasted) aux couches du modèle
    new_model.set_weights(model_weights.value)
    
    # Retourner le modèle modifié
    return new_model

## Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

In [None]:
def preprocess_image(content):
    """
    Prend en entrée le contenu binaire d'une image, la redimensionne à 224x224 pixels, la convertit en tableau NumPy et applique un prétraitement spécifique au modèle.

    Args:
    content (bytes): Contenu binaire de l'image.

    Returns:
    numpy.ndarray: Image prétraitée prête pour l'entrée dans un modèle de deep learning.
    """

    # Ouvre l'image à partir du contenu binaire et la redimensionne à 224x224 pixels
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    
    # Convertit l'image redimensionnée en tableau NumPy
    arr = img_to_array(img)
    
    # Applique une transformation de prétraitement pour adapter l'image à l'entrée du modèle
    return preprocess_input(arr)



def extract_features(model, content_series):
    """
    Transforme une série de contenus d'images en une série de vecteurs de caractéristiques.

    Args:
    model (tensorflow.keras.Model): Le modèle de deep learning utilisé pour extraire les caractéristiques.
    content_series (pd.Series): Série Pandas contenant des images (sous forme de contenus binaires).

    Returns:
    pd.Series: Série Pandas contenant des vecteurs de caractéristiques aplatis (output du modèle).
    """

    # Applique la fonction preprocess_image à chaque élément de la série, puis empile les résultats dans un tableau NumPy
    input = np.stack(content_series.map(preprocess_image))

    # Fait une prédiction sur l'ensemble des images prétraitées, c'est une extraction des features dans notre cas
    preds = model.predict(input)

    # Aplati chaque prédiction pour transformer la sortie du modèle en un vecteur 1D
    output = [p.flatten() for p in preds]

    # Retourne les vecteurs aplatis sous forme de série Pandas
    return pd.Series(output)



@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def extract_features_udf(content_series_iter):
    """
    Applique un modèle de deep learning pour extraire des caractéristiques à partir d'un flux de contenus d'images en utilisant une Pandas UDF. Le modèle est initialisé une seule fois par partition.

    Args:
    content_series_iter (Iterator[pd.Series]): Un itérateur sur des séries Pandas de contenus d'images binaires.

    Yields:
    Iterator[array<float>]: Un itérateur sur des tableaux de floats représentant les vecteurs de caractéristiques extraits pour chaque image.
    """

    # Charge un modèle pré-entraîné
    model = get_weighted_pretrained_model()

    # Boucle sur chaque série d'images (une série correspond à une partition de données)
    for content_series in content_series_iter:
        # Applique la fonction extract_features pour extraire les caractéristiques des images de la série
        yield extract_features(model, content_series)

## Exécutions des actions d'extractions de features

In [None]:
# Distribution des données sur 24 partitions, ce qui permet à Spark d'exécuter le traitement en parallèle sur ces partitions. 
# Le nombre de partitions est ajusté pour optimiser l'utilisation des ressources disponibles (CPU, mémoire)
data_features = images.repartition(24).select(
    col("path"),                                        # Sélectionne la colonne 'path' qui contient les chemins des images
    col("label"),                                       # Sélectionne la colonne 'label', représentant l'étiquette de chaque image
    extract_features_udf("content").alias("features")   # Applique la fonction UDF pour extraire les features
)

In [None]:
( 
    data_features
    .write                  # Déclenche une action qui oblige Spark à exécuter toutes les transformations planifiées précédemment
    .mode("overwrite")      # Utilise le mode 'overwrite', ce qui signifie que les fichiers existants seront écrasés s'ils existent
    .parquet(PATH_results)  # Enregistre les données au format Parquet dans le chemin spécifié
)  

# Note : Spark utilise une exécution paresseuse (lazy execution). Cela signifie que les transformations comme repartition ou select
# ne sont pas exécutées immédiatement. Cependant, l'appel à une action, comme write ici, déclenche l'exécution de toutes les étapes
# de transformation précédentes pour écrire les données sur le disque.

# Chargement des données enregistrées et validation du résultat

In [None]:
df = pd.read_parquet(PATH_results, engine='pyarrow')

In [None]:
df.head()

In [None]:
df.loc[0,'features'].shape

In [None]:
df.shape

# ACP

Nous appliquons une réduction de dimension sur les features extraites des images à l'aide de l'Analyse en Composantes Principales (ACP), puis nous sauvegardons ces données réduites dans un fichier CSV pour une utilisation future.

L'objectif ici est de réaliser une réduction de dimension dans un contexte distribué avec Spark, ce qui permet de en manipuler un grand volume de données de manière parallèle et distribuée

## Fonctions réutilisables

In [None]:
def convert_to_vector(arr):
    """
    Convertit un tableau NumPy en un vecteur dense Spark.

    Args:
    arr (numpy.ndarray): Un tableau NumPy contenant les features extraites d'une image.

    Returns:
    pyspark.ml.linalg.DenseVector: Un vecteur dense compatible avec Spark MLlib pour des algorithmes comme PCA.
    """
    return Vectors.dense(arr) # Transforme le tableau en un vecteur dense Spark MLlib



def vector_to_array(vector):
    """
    Convertit un vecteur Spark en une liste (array) Python.

    Args:
    vector (pyspark.ml.linalg.Vector): Un vecteur dense ou creux Spark.

    Returns:
    list: Une liste Python contenant les valeurs du vecteur. Renvoie None si l'entrée n'est pas un vecteur.
    """

    return vector.toArray().tolist() if isinstance(vector, Vector) else None # Vérifie si l'entrée est un vecteur, le convertit en liste si c'est le cas

## Préparation à la réduction de dimension

In [None]:
# Conversion des arrays en vecteurs denses Spark pour être compatibles avec Spark MLlib
array_to_vector_udf = F.udf(convert_to_vector, VectorUDT())

# Transformation du DataFrame pour ajouter une colonne de vecteurs de features
data_features_vectorized = data_features.withColumn("features_vector", array_to_vector_udf("features"))

# Application de l'ACP dans un contexte distribué Spark
# k=1280 signifie qu'on réduit les dimensions des features à 1280 composantes
pca_spark = SparkPCA(k=1280, inputCol="features_vector", outputCol="pca_features_vectors")
pca_model = pca_spark.fit(data_features_vectorized)                 # Entraîne le modèle PCA sur les données vectorisées
pca_spark_result = pca_model.transform(data_features_vectorized)    # Applique la transformation PCA pour réduire les dimensions

# Récupère la variance expliquée par chaque composante principale après l'application de la PCA
explained_variance = pca_model.explainedVariance.toArray()

# Calcule la variance cumulée pour chaque composante. Cela permet de savoir combien de composantes
# sont nécessaires pour capturer un certain pourcentage de la variance totale des données.
cumulative_explained_variance = explained_variance.cumsum()

# Trouve le nombre minimal de composantes qui cumulent au moins 80% de la variance totale
# La méthode 'argmax()' retourne l'indice de la première valeur supérieure ou égale à 0.8,
# donc on ajoute +1 pour obtenir le nombre de composantes nécessaires.
optimal_components_number = (cumulative_explained_variance >= 0.8).argmax() + 1

print('-' * 80)
print(f"Nombre de composantes necessaires pour expliquer 80% de la variance : {optimal_components_number}")
print('-' * 80)

# Crée une UDF pour convertir un vecteur Spark en un tableau Python
vector_to_array_udf = F.udf(vector_to_array, ArrayType(DoubleType()))

# Ajoute une nouvelle colonne "pca_array" au DataFrame, qui contient les résultats PCA sous forme d'array
# Chaque vecteur PCA est converti en un array grâce à la fonction UDF définie ci-dessus.
pca_spark_result_array = pca_spark_result.withColumn("pca_array", vector_to_array_udf("pca_features_vectors"))

pca_spark_result_array.printSchema()

# Sélectionne les composantes qui permettent d'avoir au moins 80% de la variance
# Chaque composante a un alias (un nom unique "F1", "F2", etc...)
pca_cols = [F.col("pca_array")[i].alias(f"F {i + 1}") for i in range(optimal_components_number)]

# Crée un nouveau DataFrame avec les composantes sélectionnées et conserve la colonne "label"
data_extracted_pca = pca_spark_result_array.select(*pca_cols, "label")

## Exécution de la réduction de dimension et sauvegarde des résultats

In [None]:
# Convertit le DataFrame Spark en DataFrame Pandas, ceci déclenche l'exécution des transformations Spark
# Cette action collecte toutes les données réparties sur le cluster pour les ramener en mémoire locale sous forme de DataFrame Pandas
data_extracted_pca_pandas = data_extracted_pca.toPandas()

# Sauvegarde les résultats des composantes PCA dans un fichier CSV pour une utilisation future.
data_extracted_pca_pandas.to_csv('s3://ocr-p9/results_csv/pca_features.csv', index=False)