# Importations et configuration

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1722213580150_0004,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
3,application_1722213580150_0004,pyspark,idle,Link,Link,✔


In [3]:
import io
import os

import numpy as np
import pandas as pd

from PIL import Image

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

from pyspark.sql.functions import col, udf, pandas_udf, PandasUDFType, element_at, split
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.sql.types import StructType, StructField, ArrayType, FloatType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On définit les chemins pour le dossier contenant les images et le dossier qui contiendra les résultats.

In [4]:
PATH = "s3://p9-data-jaufr-2024"
PATH_Data = PATH + "/Test"
PATH_Result = PATH + "/Results"

print("PATH :         ", PATH)
print("PATH_Data :    ", PATH_Data)
print("PATH_Result :  ", PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH :          s3://p9-data-jaufr-2024
PATH_Data :     s3://p9-data-jaufr-2024/Test
PATH_Result :   s3://p9-data-jaufr-2024/Results

# Chargement des données

On charge les images en binaire grâce à PySpark.

In [5]:
images = (spark.read.format("binaryFile")
          .option("pathGlobFilter", "*.jpg")
          .option("recursiveFileLookup", "true")
          .load(PATH_Data))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p9-data-jauf...|2024-07-28 19:10:01|  7353|[FF D8 FF E0 00 1...|
|s3://p9-data-jauf...|2024-07-28 19:10:01|  7350|[FF D8 FF E0 00 1...|
|s3://p9-data-jauf...|2024-07-28 19:10:01|  7349|[FF D8 FF E0 00 1...|
|s3://p9-data-jauf...|2024-07-28 19:10:01|  7348|[FF D8 FF E0 00 1...|
|s3://p9-data-jauf...|2024-07-28 19:10:02|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

On peut extraire les étiquettes (catégories) à partir des noms des chemins des images.

In [7]:
images = images.withColumn("label", element_at(split(images["path"], "/"), -2))

print(images.printSchema())
print(images.select("path", "label").show(5, False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+----------------------------------------------------------+----------+
|path                                                      |label     |
+----------------------------------------------------------+----------+
|s3://p9-data-jaufr-2024/Test/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://p9-data-jaufr-2024/Test/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://p9-data-jaufr-2024/Test/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://p9-data-jaufr-2024/Test/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://p9-data-jaufr-2024/Test/Test/Watermelon/r_95_100.jpg |Watermelon|
+----------------------------------------------------------+----------+
only showing top 5 rows

None

# Préparation du modèle

On charge un modèle MobileNetV2 pré-entraîné sur imagenet.

In [8]:
model = MobileNetV2(weights = "imagenet",
                    include_top = True,
                    input_shape= (224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On retire la tête (couche de classification).

In [9]:
new_model = Model(inputs = model.input,
                  outputs = model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On partage les poids avec les noeuds du cluster.

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On peut afficher un résumé des couches du modèle.

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

Les étapes précédentes peuvent être encapsulées dans une fonction.

In [12]:
def model_fn() :
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights = "imagenet",
                        include_top = True,
                        input_shape = (224, 224, 3))
    for layer in model.layers :
        layer.trainable = False
    new_model = Model(inputs = model.input,
                      outputs = model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Extraction des features

Dans cette section, on utilise PySpark pour appliquer le modèle de façon distribuée afin d'extraire les features de nos images.

In [13]:
def preprocess(content) :
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series) :
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf("array<float>", PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    """
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    """
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter :
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [14]:
# En cas d'éventuel problème de mémoire :
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
features_df = images.repartition(24).select(
    col("path"),
    col("label"),
    featurize_udf("content").alias("features")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
features_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)

On enregistre les représentations ainsi obtenues sur le bucket S3.

In [17]:
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Réduction de la dimension (PCA)

Les réprésentations sont stockées dans un tableau de flottants.
Pour appliquer la PCA de façon distribuée avec PySpark, il faut le reconvertir.

In [18]:
array_to_vector_udf = udf(lambda array : DenseVector(array), VectorUDT())
features_udf_df = features_df.withColumn("features_vector", array_to_vector_udf(col("features")))
features_udf_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)
 |-- features_vector: vector (nullable = true)

On applique une première PCA avec le nombre total de composantes.

In [19]:
initial_k = len(features_udf_df.select("features_vector").head()[0])
print(f"Nombre initial de composantes : {initial_k}")

pca = PCA(k = initial_k, inputCol = "features_vector", outputCol = "pca_features")
pca_model = pca.fit(features_udf_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Nombre initial de composantes : 1280

On calcule la variance expliquée cumulée et on détermine à partir de combien de composantes elle dépasse un seuil donné (ici 90%).

In [20]:
explained_variance = pca_model.explainedVariance.toArray()
cumulative_variance = np.cumsum(explained_variance)

threshold = 0.90
num_components = np.where(cumulative_variance >= threshold)[0][0] + 1
print(f"Nombre de composantes pour {threshold * 100}% de variance expliquée : {num_components}")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Nombre de composantes pour 90.0% de variance expliquée : 185

On réapplique une PCA avec le nombre de composantes obtenu.

In [21]:
pca_optimal = PCA(k = num_components, inputCol = "features_vector", outputCol = "pca_features")
pca_model_optimal = pca_optimal.fit(features_udf_df)
pca_result_df = pca_model_optimal.transform(features_udf_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Extraction des résultats

Seuls les chemins des images, les étiquettes et les valeurs de la PCA nous intéressent.

In [22]:
extract_pca_values_udf = udf(lambda x : x.values.tolist(), ArrayType(FloatType()))
pca_result_extracted = pca_result_df.withColumn("pca", extract_pca_values_udf(col("pca_features")))
pca_result_extracted = pca_result_extracted.select("path", "label", "pca")
pca_result_extracted.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- pca: array (nullable = true)
 |    |-- element: float (containsNull = true)

La colonne PCA contient des listes de valeurs que l'on étale sur autant de colonnes qu'il y a de composantes.

In [23]:
pca_columns = [col("pca")[i].alias(f"pca_{i + 1}") for i in range(num_components)]
final_df = pca_result_extracted.select("path", "label", *pca_columns)

print(f"Taille : {final_df.count()} lignes pour {images.count()} images")
final_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Taille : 22688 lignes pour 22688 images
root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- pca_1: float (nullable = true)
 |-- pca_2: float (nullable = true)
 |-- pca_3: float (nullable = true)
 |-- pca_4: float (nullable = true)
 |-- pca_5: float (nullable = true)
 |-- pca_6: float (nullable = true)
 |-- pca_7: float (nullable = true)
 |-- pca_8: float (nullable = true)
 |-- pca_9: float (nullable = true)
 |-- pca_10: float (nullable = true)
 |-- pca_11: float (nullable = true)
 |-- pca_12: float (nullable = true)
 |-- pca_13: float (nullable = true)
 |-- pca_14: float (nullable = true)
 |-- pca_15: float (nullable = true)
 |-- pca_16: float (nullable = true)
 |-- pca_17: float (nullable = true)
 |-- pca_18: float (nullable = true)
 |-- pca_19: float (nullable = true)
 |-- pca_20: float (nullable = true)
 |-- pca_21: float (nullable = true)
 |-- pca_22: float (nullable = true)
 |-- pca_23: float (nullable = true)
 |-- pca_24: float (nullable = true)
 |-

On enregistre le résultat au format CSV sur le bucket S3.

In [25]:
csv_path = PATH + "/PCA_Results_CSV"
final_df.coalesce(1).write.mode("overwrite").option("header", "true").csv(csv_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…