# Réalisez un traitement dans un environnement Big Data sur le Cloud

Puisque les tests en local se sont bien déroulés, nous allons maintenant exécuter notre script dans un environnement cloud sur un plus grand nombre d'images.

Nous allons passer par la plateforme AWS et utiliser deux services :

- EMR : notre cluster de machine qui exécutera notre notebook avec Spark, grâce à JupyterHub qui sera installé sur le driver.
- S3 : notre service de stockage qui contiendra nos images à traiter, le notebook à exécuter, les logs d'exécutions de Spark et nos fichiers de sortie.

### Démarrage de la session Spark

Le code sera exécuté depuis JupyterHub hébergé sur un cluster EMR AWS.

Avant de commencer, il faut s'assurer d'utiliser le kernel PySpark.

En utilisant ce kernel, une session spark est créé à l'exécution de la première cellule. Les variable spark et sc seront déjà disponibles.

Il n'est donc plus nécessaire d'exécuter le code "spark = (SparkSession ..." comme dans notre notebook exécuté en local.

In [57]:
# L'exécution de cette cellule démarre l'application Spark

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Import des librairies

In [2]:
# Mesure des durées d'exécution 
import time
# Pour lire les images qui auront été chargées au format binaire
import io
# Pour la gestion des chemins de fichiers
import os

# Désactivation des messages de debugging de tensorflow
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

# Pour valider les résultats et les exporter en CSV
import pandas as pd
# Pour redimensionner les images
from PIL import Image
# Pour la manipulation d'arrays
import numpy as np

# Pour l'extraction de features des images
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

# Création et manipulation de dataframes Spark
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

# Pour conversion des features en vecteurs Spark
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
# Pour réalisation de la PCA sur les features
from pyspark.ml.feature import PCA

print("Imports charges ! ")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Imports charges !

### 1. Définition des PATH pour charger les images et enregistrer les résultats 

In [3]:
PATH = 's3://p11-sam-bucket'
PATH_Data = PATH + '/Test'
PATH_Result = PATH + '/Results'
print('PATH :        ' + PATH\
      + '\nPATH_Data :   ' + PATH_Data \
      + '\nPATH_Result : ' + PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH :        s3://p11-sam-bucket
PATH_Data :   s3://p11-sam-bucket/Test
PATH_Result : s3://p11-sam-bucket/Results

### 2. Traitement des données

#### 2.1 Chargement des données

Les images sont chargées au format binaire, ce qui offre plus de souplesse dans la façon de prétraiter les images.

Avant de charger les images, nous spécifions que nous voulons charger uniquement les fichiers dont l'extension est jpg.

Nous indiquons également de charger tous les objets possibles contenus dans les sous-dossiers du dossier communiqué ("recursiveFileLookup").

In [4]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Vérifions que le chargement s'est bien passé :

In [5]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p11-sam-buck...|2025-03-19 18:03:56|  7266|[FF D8 FF E0 00 1...|
|s3://p11-sam-buck...|2025-03-19 18:04:08|  7074|[FF D8 FF E0 00 1...|
|s3://p11-sam-buck...|2025-03-19 18:04:04|  7028|[FF D8 FF E0 00 1...|
|s3://p11-sam-buck...|2025-03-19 18:04:00|  6962|[FF D8 FF E0 00 1...|
|s3://p11-sam-buck...|2025-03-19 18:04:10|  6742|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

Ajout d'une colonne contenant les labels de chaque image :

In [6]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+-------------------------------------+-----+
|path                                 |label|
+-------------------------------------+-----+
|s3://p11-sam-bucket/Test/r_88_100.jpg|Test |
|s3://p11-sam-bucket/Test/145_100.jpg |Test |
|s3://p11-sam-bucket/Test/284_100.jpg |Test |
|s3://p11-sam-bucket/Test/296_100.jpg |Test |
|s3://p11-sam-bucket/Test/82_100.jpg  |Test |
+-------------------------------------+-----+
only showing top 5 rows

None

#### 2.2 Préparation du modèle

Nous allons utiliser la technique du transfert learning pour extraire les features des images.

Nous allons utiliser le modèle MobileNetV2 pour sa rapidité d'exécution comparée à d'autres modèles comme VGG16 par exemple. Nous allons donc récupérer l'avant dernière couche du modèle en sortie. La dernière couche, avec sa fonction d'activation Softmax, est destinée à la classification, ce que nous ne souhaitons pas ici.

MobileNetV2, lorsqu'on l'utilise en incluant toutes ses couches, attend obligatoirement des images de dimension (224,224,3). Nos images étant toutes de dimension (100,100,3), nous devrons simplement les redimensionner avant de les confier au modèle.

Dans l'odre :

- Nous chargeons le modèle MobileNetV2 avec les poids précalculés issus d'imagenet et en spécifiant le format de nos images en entrée
- Nous créons un nouveau modèle avec :
    - en entrée : l'entrée du modèle MobileNetV2
    - en sortie : l'avant dernière couche du modèle MobileNetV2

In [7]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [54]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Affichage du résumé de notre nouveau modèle où nous constatons que nous récupérons en sortie un vecteur de dimension (1, 1, 1280) :

In [9]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 112, 112, 32)         864       ['input_1[0][0]']             
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 112, 112, 32)         128       ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 112, 112, 32)         0         ['bn_Conv1[0][0]']        

Tous les workeurs doivent pouvoir accéder au modèle ainsi qu'à ses poids.
Une bonne pratique consiste à charger le modèle sur le driver puis à diffuser ensuite les poids aux différents workeurs.

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)  # Diffusion des poids
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

####  2.3 Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

Ce notebook définit la logique par étapes, jusqu'à Pandas UDF.

L'empilement des appels est la suivante :

- Pandas UDF
    - featuriser une série d'images pd.Series
    - prétraiter une image

In [12]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    # Chargement des images au format binaire et redimensionnement
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)  # Conversion en array
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # Pour certaines couches, les caractéristiques de sortie seront des tenseurs multidimensionnels.
    # Nous aplatissions les tenseurs de caractéristiques en vecteurs pour faciliter le stockage dans les DataFrames Spark.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # Avec PandasUDFType.SCALAR_ITER, nous pouvons charger le modèle une fois puis le réutiliser
    # pour plusieurs lots de données. Cela amortit les coûts liés au chargement des modèles volumineux.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



####  2.4 Exécution des actions d'extraction de features

Les Pandas UDF, sur de grands enregistrements (par exemple, de très grandes images), peuvent rencontrer des erreurs de type Out Of Memory (OOM).

Si de telles erreurs arrivent ci-dessous, il est possible de réduire la taille du lot Arrow via 'maxRecordsPerBatch'

Il ne devrait pas y avoir de problème ici, je laisse donc la commande en commentaire.

In [13]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Il s'agit ici d'une étape de transformation dans Spark, autrement dit l'extraction réelle des features n'aura pas encore lieu, elle sera déclenchée par une action plus tard.

In [14]:
# Choix du nombre de partitions que l'on va créer avec "images"
nb_partitions = 20

features_df = images.repartition(nb_partitions).select(col("path"),
                                                       col("label"),
                                                       featurize_udf("content").alias("features")
                                                      )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Affichage de la structure du dataframe features_df :

In [15]:
features_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)

In [16]:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 2.5 Réalisation de la PCA 

In [17]:
def nb_composante(dataframe, nb_comp=220):
    '''
    Recherche du nombre de composantes expliquant 95% de la variance
    :param dataframe: Le dataframe d'images
    :param nb_comp: Le nombre de composantes initiales pour l'ACP
    :return: Le nombre de composantes expliquant 95% de la variance totale
    '''

    # Initialisation de l'analyse en composantes principales (ACP)
    pca = PCA(k=nb_comp, inputCol="features_scaled", outputCol="features_pca")

    # Entraînement du modèle PCA
    model_pca = pca.fit(dataframe)

    # Calcul de la variance expliquée
    varexpl = model_pca.explainedVariance * 100

    # Calcul de la variance cumulée
    cumSumVar = varexpl.cumsum()

    # Recherche du nombre de composantes pour atteindre 95% de variance expliquée
    limit = 95
    min_plans = np.argmax(cumSumVar >= limit) + 1

    # Retour du nombre de composantes nécessaires pour expliquer 95% de la variance
    return min_plans

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
def preprocess_pca(dataframe):
  '''
     Préparation des données :
     - transformation en vecteur dense
     - standardisation
     param : dataframe : dataframe d'images
     return : dataframe avec features vecteur dense standardisé
  '''

  # Préparation des données - conversion des données images en vecteur dense
  transform_vecteur_dense = udf(lambda r: Vectors.dense(r), VectorUDT())
  dataframe = dataframe.withColumn('features_vectors', transform_vecteur_dense('features'))

  # Standardisation obligatoire pour PCA
  scaler_std = StandardScaler(inputCol="features_vectors", outputCol="features_scaled", withStd=True, withMean=True)
  model_std = scaler_std.fit(dataframe)
  # Mise à l'échelle
  dataframe = model_std.transform(dataframe)

  return dataframe

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
from pyspark.ml.feature import StandardScaler

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '3'

# Pré-processing (vecteur dense, standardisation)
df_pca = preprocess_pca(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
# Nombre de composante expliquant 95% de la variance
n_components = nb_composante(df_pca)
n_components

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

158

In [23]:
# Entrainement de l'algorithme
pca = PCA(k=n_components, inputCol='features_scaled', outputCol='features_pca')
model_pca = pca.fit(df_pca)

# Transformation des images sur les k premières composantes
df_reduit = model_pca.transform(df_pca)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 3. Écriture des résultats sous forme de fichiers 

In [24]:
# Write the result after PCA to a parquet file
df_reduit.write.mode("overwrite").parquet(PATH_Result + "/pca_results")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
print(PATH_Result + "/pca_results")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://p11-sam-bucket/Results/pca_results

In [26]:
# Étape 1 - Lire le fichier parquet de la PCA
df = pd.read_parquet(PATH_Result + '/pca_results', engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
df.head(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                     path  ...                                       features_pca
0  s3://p11-sam-bucket/Test/r_168_100.jpg  ...  {'type': 1, 'size': None, 'indices': None, 'va...
1    s3://p11-sam-bucket/Test/296_100.jpg  ...  {'type': 1, 'size': None, 'indices': None, 'va...
2    s3://p11-sam-bucket/Test/183_100.jpg  ...  {'type': 1, 'size': None, 'indices': None, 'va...
3  s3://p11-sam-bucket/Test/r_187_100.jpg  ...  {'type': 1, 'size': None, 'indices': None, 'va...
4    s3://p11-sam-bucket/Test/142_100.jpg  ...  {'type': 1, 'size': None, 'indices': None, 'va...

[5 rows x 6 columns]

### export des composantes PCA dans un fichier CSV

In [None]:
# Étape 2 - Nettoyer le nom du fichier
df['path'] = df['path'].apply(lambda x: x.split('/')[-1])
df = df.rename(columns={'path': 'filename'})

# Étape 3 - Extraire les valeurs depuis les vecteurs Spark (qui sont en dict)
df['features_pca'] = df['features_pca'].apply(lambda x: x['values'] if x is not None else None)

In [32]:
# Étape 4 - Sauvegarder au format CSV (attention : tu peux changer le séparateur si tu veux)
with np.printoptions(linewidth=10000):
    df[['filename', 'label', 'features_pca']].to_csv(PATH_Result + '/pca_features.csv', index=False, sep=';')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…