Chaine de prétraitement des données

# Librairies

In [11]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os
from pathlib import Path
import random
import shutil

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

# Création du folder Test1

On va sélectionner pour chaque catégorie d'image 3 images aléatoires et créer une arborescence.

Le dossier d'images utilisé pour créer Test1 sont celles de ce dossier : fruits-360_dataset\fruits-360\Test

## Paths

In [12]:
random.seed(42)

In [13]:
images_folder_filepath = "/home/maxime/projects/P9/data/Images/Test"
main_folder = Path(images_folder_filepath)

test1_folder_filepath = "/home/maxime/projects/P9/data/Images/Test1"
output_folder = Path(test1_folder_filepath)

IMAGE_EXTENSIONS = {".jpg"} # Définition des extensions d'images valides
NUM_IMAGES_TO_SELECT = 3
selected_image_paths = [] # La liste qui contiendra tous les chemins des images sélectionnées

## Sélection des images et de leurs classes

In [14]:
if output_folder.is_dir():
    print(f"Le dossier d'images Test1 existe déja")
else:
    print(f"Sélection des images à mettre dans le dossier d'images Test1")

    # On parcourt chaque élément (fichier ou dossier) dans le dossier principal
    for subfolder in main_folder.iterdir():
        
        # Vérifiez que l'élément est bien un dossier
        if subfolder.is_dir():
            print(f"--- Traitement du sous-dossier : {subfolder.name} ---")
            
            # Listez toutes les images valides dans ce sous-dossier
            images_in_subfolder = [
                item for item in subfolder.iterdir() 
                if item.is_file() and item.suffix.lower() in IMAGE_EXTENSIONS
            ]
            
            num_found = len(images_in_subfolder) #Permet de d'assurer qu'on a au moins 3 images à sélectionner dedans

            # S'il y a assez d'images, on en sélectionne 3 au hasard
            if num_found >= NUM_IMAGES_TO_SELECT:
                print(f"Trouvé {num_found} images. Sélection de {NUM_IMAGES_TO_SELECT} au hasard.")
                selected_samples = random.sample(images_in_subfolder, NUM_IMAGES_TO_SELECT)
            # Sinon, on sélectionne toutes les images disponibles et afficher un avertissement
            else:
                print(f"AVERTISSEMENT : Trouvé seulement {num_found} image(s). Sélection de toutes les images.")
                selected_samples = images_in_subfolder.copy()

            # Ajouter les chemins sélectionnés à notre liste finale
            # On utilise extend() pour ajouter tous les éléments de la liste 'selected_samples'
            selected_image_paths.extend(selected_samples)
            print("-" * (len(subfolder.name) + 30)) # Ligne de séparation pour la lisibilité

    # --- 3. Affichage des résultats ---

    print("\n--- Sélection des images pour le dossier Test1 terminée ---")
    print(f"Nombre total d'images sélectionnées : {len(selected_image_paths)}")

    # Affichons les 10 premiers chemins pour vérifier
    print("\nVoici les 10 premiers chemins sélectionnés :")
    for path in selected_image_paths[:10]:
        print(path)
    print(f"{len(selected_image_paths)} images ont été selectionnées")

    print("\nRécupération de la catégorie pour chaque image sélectionnée")

    loaded_images = []
    labels = []

    for path in selected_image_paths:
        try:
            # a) Extraire le nom du dossier parent comme classe (label)
            # Si le chemin est /.../chat/image01.jpg, path.parent.name sera "chat"
            label = path.parent.name
            
            # b) Charger l'image avec Pillow
            image = Image.open(path)
            
            # c) On va convertir l'image en mode 'RGB'
            # Cela garantit que toutes les images ont 3 canaux de couleur (Rouge, Vert, Bleu).
            # permet d'éviter les problèmes avec les images en noir et blanc (mode 'L') ou avec canal alpha (mode 'RGBA').
            image_rgb = image.convert('RGB')
            
            # d) Ajouter l'image chargée et son label aux listes
            loaded_images.append(image_rgb)
            labels.append(label)

        except Exception as e:
            # Gérer le cas où une image est corrompue ou ne peut être ouverte
            print(f"AVERTISSEMENT : Impossible de charger l'image {path}. Erreur : {e}")
            # On passe à l'image suivante sans planter le script
            continue

Le dossier d'images Test1 existe déja


## Dataframe contenant les paths et leurs classes

In [15]:
pd.set_option('max_colwidth', 400)

In [16]:
"""
images_test1_df = pd.DataFrame({
    'image': loaded_images,
    'label': labels,
    'image_path': selected_image_paths
})

images_test1_df
"""

"\nimages_test1_df = pd.DataFrame({\n    'image': loaded_images,\n    'label': labels,\n    'image_path': selected_image_paths\n})\n\nimages_test1_df\n"

## Création du folder avec les images sélectionnées

In [17]:
if output_folder.is_dir():
    print(f"Le dossier d'images Test1 existe déja")
else:
    print(f"Création du dossier d'images Test1")

    output_folder.mkdir(exist_ok=True) # Création du dossier

    copied_files_count = 0
    for source_path in selected_image_paths:
        try:
            # Extraire le nom de la classe (ex: 'chat', 'chien')
            class_name = source_path.parent.name
            
            # Créer le dossier pour cette classe dans 'Test1'
            destination_class_folder = output_folder / class_name
            destination_class_folder.mkdir(exist_ok=True)
            
            # Définir le chemin complet du fichier de destination
            destination_file_path = destination_class_folder / source_path.name
            
            # Copier le fichier
            shutil.copy2(source_path, destination_file_path)
            copied_files_count += 1
            
        except Exception as e:
            print(f"AVERTISSEMENT : Impossible de copier {source_path}. Erreur : {e}")

    print(f"\nOpération terminée. {copied_files_count} fichiers copiés dans {output_folder}.")

    

Le dossier d'images Test1 existe déja


# Création de la SparkSession

In [18]:
spark = (SparkSession
             .builder # .builder starts the construction process
             .appName('P9')
             .master('local') # .master specifies where the Spark application runs.
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate() # If a SparkSession with the same configuration already exists, it returns that session. - If no session exists, it creates a new one based on the builder settings
)

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/07/02 07:08:45 WARN Utils: Your hostname, DESKTOP-55DD1RU, resolves to a loopback address: 127.0.1.1; using 172.19.19.203 instead (on interface eth0)
25/07/02 07:08:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/02 07:08:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [19]:
spark

In [20]:
sc = spark.sparkContext

In [21]:
sc

# Chargement des images

In [22]:
"""
images = 
   # On dit a Spark qu'il a lire des données depuis une souyrce externe au format binaire
   spark.read.format("binaryFile") \
    # On filtre les fichiers en fonction de leur format ici on veut jpg
  .option("pathGlobFilter", "*.jpg") \
    # Indique si Spark doit regarder à l'intérieur des sous-dossiers, True pour oui
  .option("recursiveFileLookup", "true") \
    # spécifie le chemin de la source de données
  .load(images_folder_filepath)
"""

'\nimages = \n   # On dit a Spark qu\'il a lire des données depuis une souyrce externe au format binaire\n   spark.read.format("binaryFile")     # On filtre les fichiers en fonction de leur format ici on veut jpg\n  .option("pathGlobFilter", "*.jpg")     # Indique si Spark doit regarder à l\'intérieur des sous-dossiers, True pour oui\n  .option("recursiveFileLookup", "true")     # spécifie le chemin de la source de données\n  .load(images_folder_filepath)\n'

In [23]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(test1_folder_filepath)

                                                                                

In [24]:
images # DataFrame images qui a été créé avec spark.read.format("binaryFile") et contient pour chaque image une colonne path de type file:/home/maxime/projects/P9/data/Images/Test1/classe_de_l'image/image_01.jpg

DataFrame[path: string, modificationTime: timestamp, length: bigint, content: binary]

In [25]:
# Crée un nouveau dataframe a partir de celui que l'on avait juste avant # split est une fonction de Spark SQL qui prend une chaîne de caractères et la découpe en un tableau (array) en utilisant un délimiteur
# split est une fonction de Spark SQL qui prend une chaîne de caractères et la découpe en un tableau (array) en utilisant un délimiteur
# withColumn est la méthode standard de Spark pour ajouter une nouvelle colonne à un DataFrame (ou remplacer une colonne existante)

images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+--------------------------------------------------------------------------+--------------+
|path                                                                      |label         |
+--------------------------------------------------------------------------+--------------+
|file:/home/maxime/projects/P9/data/Images/Test1/Watermelon/r_71_100.jpg   |Watermelon    |
|file:/home/maxime/projects/P9/data/Images/Test1/Pineapple Mini/268_100.jpg|Pineapple Mini|
|file:/home/maxime/projects/P9/data/Images/Test1/Watermelon/272_100.jpg    |Watermelon    |
|file:/home/maxime/projects/P9/data/Images/Test1/Watermelon/202_100.jpg    |Watermelon    |
|file:/home/maxime/projects/P9/data/Images/Test1/Cauliflower/r_290_100.jpg |Cauliflower   |
+-----------------------------------------------------

                                                                                

In [26]:
print(images)

DataFrame[path: string, modificationTime: timestamp, length: bigint, content: binary, label: string]


# Chargement de MobileNetV2

## Configuration et distribution

In [27]:
MobileNetV2

<function keras.src.applications.mobilenet_v2.MobileNetV2(input_shape=None, alpha=1.0, include_top=True, weights='imagenet', input_tensor=None, pooling=None, classes=1000, classifier_activation='softmax', name=None)>

In [30]:
model = MobileNetV2(weights='imagenet',
                    include_top=True, # On peut laisser True car on ne va pas l'enlever après
                    input_shape=(224, 224, 3))

model.trainable = False

new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output) # On la top layer on classification layer

new_model.summary()

output de la derniere couche : tenseur 4D de la forme (None, 1, 1, 1280) --> None car le batch size n'a pas changé, 1 et 1 pour hauteur et largeur et 1280 caractéristiques ou features extraites de l'image. Donc on peut considérer que l'on a comme sortie un vecteur de caractéristiques de dimensions (1,1,1280).

## Broadcast des poids

In [31]:
brodcast_weights = sc.broadcast(new_model.get_weights())

On a besoin de charger le modèle une première fois et on récupère les poids et on lance l'opération de diffusion avec sc.broadcast().

L'action de charger la premiere fois le modele et de broadcast les poids est pour préparer le terrain afin que quand on charge le modèle sur chacun des workers ils auront juste a recuperer les poids avec new_model.set_weights(brodcast_weights.value)

Si on ne fait pas ça on aura un bottleneck a chaque tache ou les workers devront redemander les poids au driver

Cette fonction servira a l'instanciation locale 

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

# Définition du processus de chargement des images et application <br/>de leur featurisation à travers l'utilisation de pandas UDF

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)