# P9 : Preprocessing de données visuelles et PCA

Objectif du notebook :

Réaliser en local le processing des données et la réduction de dimensionnalité des vecteurs prédits avec le modèle entrainé dans un code Pyspark en vue d'un futu déploiement dans le cloud AWS

## 1- Import des librairies

In [None]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

In [None]:
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession


## 2- Data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
PATH = os.getcwd()
PATH_Data = '/content/drive/MyDrive/fruits/fruits-360_dataset/fruits-360/Test'
PATH_Result = PATH+'/data/Results'

In [None]:
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        /content
PATH_Data:   /content/drive/MyDrive/fruits/fruits-360_dataset/fruits-360/Test
PATH_Result: /content/data/Results


## 3- Instanciation des outils Spark

In [None]:
# création d'une session spark = pt d'entrée dans les fonctionnalités spark
# nécessaire pour contruire un dataframe spark
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)

In [None]:
# instance d'un sparkcontext
# permet d'accéder aux fonctionnalités de bas niveau de spark
sc = spark.sparkContext

In [None]:
spark

In [None]:
# Configurations pour optimiser les performances et la gestion de la mémoire
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", 1024)
spark.conf.set("spark.sql.shuffle.partitions", 4)

## 4- Loading des images et création d'un dataframe Spark

In [None]:
# lecture distribuée des images
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data) \
  .limit(2)

In [None]:
type(images)

On a un spark dataframe

In [None]:
# extraction du label depuis le path
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))

In [None]:
print(images.printSchema())

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None


In [None]:
print(images.select('path','label').show(5,False))

+----------------------------------------------------------------------------------------------+----------+
|path                                                                                          |label     |
+----------------------------------------------------------------------------------------------+----------+
|file:/content/drive/MyDrive/fruits/fruits-360_dataset/fruits-360/Test/Watermelon/r_106_100.jpg|Watermelon|
|file:/content/drive/MyDrive/fruits/fruits-360_dataset/fruits-360/Test/Watermelon/r_109_100.jpg|Watermelon|
+----------------------------------------------------------------------------------------------+----------+

None


## 5- Loading du modèle pré-entrainé et transfert learning

In [None]:
# loading du modèle pré-entrainé
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5
[1m14536120/14536120[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 0us/step


In [None]:
# suppression de la dernière couche de classification
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

In [None]:
new_model.summary()

On obtient en sortie des vecteurs denses de taille 1280

In [None]:
# broadcast des poids du modèle aux workers spark : permet d'éviter de renvoyer les poids à chaque appel
brodcast_weights = sc.broadcast(new_model.get_weights())

## 5- Inférence sur le modèle et batching des prédictions

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    :return: a preprocessed image tensor
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)


# définition d'un pandas UDF scalaire itératif (le format pandas spark)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER) # active pandas + Arrow // batch + iterateur
# distribue sur les workers, chaque elt de sortie est un batch de données
def featurize_udf(content_series_iter): # batch spark
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series) # charge le modèle une fois et renvoie un batch



- Spark découpe les données en partitions
- chaque partition est envoyée au worker
- les données sont traitées par batch

content_series = un batch d’images

le modèle est chargé une seule fois par worker

chaque yield renvoie un batch de résultats

les prédictions sont batchées

Spark ne veut pas de float arr mais des vecteurs UDT

=> il faut convertir en vecteur DenseVector

## 6- PCA

Pour faire une PCA avec Spark il faut :

- modifier les vecteurs en DenseVector
- centrer /  réduire les données = standardisation
- importer la PCA Spark
- créer le pipeline

In [None]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT, Vectors
# modification des vecteurs
array_to_vector = udf(lambda x: Vectors.dense(x), VectorUDT())

1- On va créer une colonne features_array pour associer les arrays à chaque image correspondantes

In [None]:
df = images.withColumn(
    "features_array",
    featurize_udf("content")
)

In [None]:
df.printSchema()

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)
 |-- features_array: array (nullable = true)
 |    |-- element: float (containsNull = true)



In [None]:
df.show()

+--------------------+-------------------+------+--------------------+----------+--------------------+
|                path|   modificationTime|length|             content|     label|      features_array|
+--------------------+-------------------+------+--------------------+----------+--------------------+
|file:/content/dri...|2026-01-06 08:24:00|  7353|[FF D8 FF E0 00 1...|Watermelon|[0.0, 0.23397025,...|
|file:/content/dri...|2026-01-06 08:24:00|  7350|[FF D8 FF E0 00 1...|Watermelon|[0.08841883, 0.83...|
+--------------------+-------------------+------+--------------------+----------+--------------------+



2- On convertit les arrays en vecteurs

In [None]:
df = df.withColumn(
    "features",
    array_to_vector("features_array")
)

In [None]:
df.printSchema()

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)
 |-- features_array: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- features: vector (nullable = true)



In [None]:
df.columns

['path',
 'modificationTime',
 'length',
 'content',
 'label',
 'features_array',
 'features']

On retrouve bien les deux éléments nécessaires à la PCA

1- les arrays<float> du pandas UDF

2- VectorUDT utilisable par Spark MLlib

In [None]:
# mise en cache du dataframe en mémoire
# permet d'éviter de recalculer les features à chaque action
df = df.cache()
df.count()

2

### 3-  standardisation des données

MobileNetV2 produit des features avec des valeurs sur des échelles différentes
La PCA est très sensible à l’échelle : sinon certaines dimensions vont dominer artificiellement.
On utilise StandardScaler pour ramener à des échelles comparatives

In [None]:
from pyspark.ml.feature import StandardScaler

In [None]:
scaler = StandardScaler(
    inputCol="features",
    outputCol="features_scaled",
    withMean=False,   # centre les données (moyenne 0)
    withStd=True     # met à l'échelle avec écart-type 1
)


In [None]:
scaler_model = scaler.fit(df).transform(df)

In [None]:
df.columns

['path',
 'modificationTime',
 'length',
 'content',
 'label',
 'features_array',
 'features']

In [None]:
from pyspark.ml.feature import PCA

In [None]:
pca = PCA(
    k=5,
    inputCol="features_scaled",
    outputCol="features_pca"
)

In [None]:
pca_model = pca.fit(scaler_model)
df_pca = pca_model.transform(scaler_model)

In [None]:
df_pca.select("features_pca").show(truncate=False)
print(df_pca.select("features_pca").first()[0].size)

+-----------------------------------------------------------------------------------------------+
|features_pca                                                                                   |
+-----------------------------------------------------------------------------------------------+
|[144.19234619106516,50.367571810036296,-34.98492965611851,-9.49177457709768,-69.59508653945116]|
|[99.69515526849125,50.36757181003622,-34.98492965611856,-9.491774577097686,-69.59508653945102] |
+-----------------------------------------------------------------------------------------------+

5


In [None]:
# on check la taille du vecteur
df_pca.select("features_pca").head()[0].size

5

In [None]:
df_pca.printSchema()

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)
 |-- features_array: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- features: vector (nullable = true)
 |-- features_scaled: vector (nullable = true)
 |-- features_pca: vector (nullable = true)



In [None]:
print(PATH_Result)

/content/data/Results


In [None]:
# sauvegarde des résultats
# sauvegarde du dataframe PCA dans un format parquet (format colonne optimisé pour spark)
df_pca.write.mode("overwrite").parquet(PATH_Result)

In [None]:
# lecture des résultats
df = pd.read_parquet(PATH_Result, engine='pyarrow')