<a href="https://colab.research.google.com/github/amadousysada/cloud-big-data-pyspark/blob/main/notebook.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# 1 Démarrage de la session Spark

In [1]:
# L'exécution de cette cellule démarre l'application Spark

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,application_1758223640353_0004,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,application_1758223640353_0004,pyspark,idle,Link,Link,,✔


# 2 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

# 3 Import des librairies

In [3]:
import io
import os

import pandas as pd
import numpy as np
import tensorflow as tf

from PIL import Image

import matplotlib
import matplotlib.pyplot as plt

from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

import pyspark
from pyspark.sql import functions as F
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.ml.feature import StandardScaler, PCA
from pyspark.ml.functions import array_to_vector, vector_to_array
from pyspark.storagelevel import StorageLevel
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml import Pipeline

print(f"Pyspark: {pyspark.__version__}")
print(f"Pyspark: {pyspark.__version__}")
print(f"Tensorflow: {tf.__version__}")
print(f"Pandas: {pd.__version__}")
print(f"Numpy: {np.__version__}")
print(f"Matplotlib: {matplotlib.__version__}")
print(f"PIL: {Image.__version__}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Pyspark: 3.4.1+amzn.2
Pyspark: 3.4.1+amzn.2
Tensorflow: 2.11.0
Pandas: 1.3.5
Numpy: 1.20.0
Matplotlib: 3.5.3
PIL: 9.5.0

# 4 Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [4]:
PATH = "s3://p11-fruits-pipeline/raw/fruits/fruits-360"
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 5 Traitement des données

## 5.1 Chargement des données

In [5]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
images.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p11-fruits-p...|2025-09-15 11:36:11|  7353|[FF D8 FF E0 00 1...|
|s3://p11-fruits-p...|2025-09-15 11:36:12|  7350|[FF D8 FF E0 00 1...|
|s3://p11-fruits-p...|2025-09-15 11:36:11|  7349|[FF D8 FF E0 00 1...|
|s3://p11-fruits-p...|2025-09-15 11:36:11|  7348|[FF D8 FF E0 00 1...|
|s3://p11-fruits-p...|2025-09-15 11:36:12|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [7]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+----------------------------------------------------------------------------+----------+
|path                                                                        |label     |
+----------------------------------------------------------------------------+----------+
|s3://p11-fruits-pipeline/raw/fruits/fruits-360/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://p11-fruits-pipeline/raw/fruits/fruits-360/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://p11-fruits-pipeline/raw/fruits/fruits-360/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://p11-fruits-pipeline/raw/fruits/fruits-360/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://p11-fruits-pipeline/raw/fruits/fruits-360/Test/Watermelon/r_95_100.jpg |Watermelon|
+---------------------------------------------------------------------

## 5.2 Préparation du modèle

In [8]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model.summary()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

In [12]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 5.3 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [13]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



## 5.4 Exécutions des actions d'extractions de features

In [14]:
features_df = images.repartition(24).select(col("path"), col("label"), featurize_udf("content").alias("features"))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 5.4.1 Reduction de dimensions: PCA

In [15]:
array_to_vector = lambda x: Vectors.dense(x)
array_to_vector_udf = F.udf(array_to_vector, VectorUDT())
features_df = features_df.withColumn("features_input", array_to_vector_udf(features_df["features"]))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
row_1 = features_df.first()
num_features = len(row_1["features"])
print(f"Nombre de features : {num_features}")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Nombre de features : 1280

> Normalisation

In [17]:
scaler = StandardScaler(
    inputCol="features_input",
    outputCol="scaled_features",
    withStd=True,
    withMean=True
)
pca = PCA(
    k=num_features,
    inputCol=scaler.getOutputCol(),
    outputCol="pca_features"
)

pipeline = Pipeline(stages=[scaler , pca])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
model = pipeline.fit(features_df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
explained_variance = model.stages[-1].explainedVariance.toArray()
print("Variance expliquée par les composantes principales :")
print(explained_variance)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Variance expliqu?e par les composantes principales :
[7.67207412e-02 5.04070070e-02 4.36569740e-02 ... 1.52878720e-05
 1.44198048e-05 1.41866501e-05]

In [20]:
explained_variance = model.stages[-1].explainedVariance
cumulative_variance = np.cumsum(explained_variance)

df_results = pd.DataFrame({
    'Dimension': ["Dim" + str(i+1) for i in range(len(explained_variance))],
    'Variance Expliquée (%)': explained_variance * 100,
    'Somme Cumulée (%)': cumulative_variance * 100
})

df_results

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

     Dimension  Variance Expliqu?e (%)  Somme Cumul?e (%)
0         Dim1                7.672074           7.672074
1         Dim2                5.040701          12.712775
2         Dim3                4.365697          17.078472
3         Dim4                3.021573          20.100045
4         Dim5                2.783341          22.883386
...        ...                     ...                ...
1275   Dim1276                0.001573          99.994062
1276   Dim1277                0.001548          99.995611
1277   Dim1278                0.001529          99.997139
1278   Dim1279                0.001442          99.998581
1279   Dim1280                0.001419         100.000000

[1280 rows x 3 columns]

In [21]:
target = 0.99
n = len(cumulative_variance)
x = np.arange(1, n + 1)
k = np.searchsorted(cumulative_variance, target) + 1

fig, ax = plt.subplots(figsize=(12, 6))
ax.plot(x, cumulative_variance)
ax.axvline(x=k, linestyle='--', color="red")
#ax.axhline(y=target, linestyle=':')

ax.set_xlabel("Composantes")
ax.set_ylabel("variance cumulée")

ax.annotate(
    f"k = {k}",
    xy=(k, cumulative_variance[k-1]),
    xytext=(k + n*0.05, target),
    arrowprops=dict(arrowstyle="->"),
    rotation=90,
    color="red",
    fontweight="bold"
)
plt.suptitle(f"Variance cumulée et choix de k (≥{target}%)")
plt.show()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
print(f"Avec {k} dimensions nous capturons {target} de la variance")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Avec 983 dimensions nous capturons 0.99 de la variance

In [23]:
pca = PCA(
    k=k,
    inputCol=scaler.getOutputCol(),
    outputCol="pca_features"
)

pipeline = Pipeline(stages=[scaler , pca])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
model = pipeline.fit(features_df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
features_df = model.transform(features_df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 5.4.2 Enregistrement des données traitées au format "**parquet**"

In [26]:
features_df = features_df.withColumn("pca_features", vector_to_array(features_df["pca_features"]))
features_df.drop('scaled_features', 'features_input').write.mode("overwrite").parquet(PATH_Result)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 6 Chargement des données enregistrées et validation du résultat

In [27]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
df.head()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                       pca_features
0  s3://p11-fruits-pipeline/raw/fruits/fruits-360...  ...  [-9.801482800863004, 4.593026540829984, -3.928...
1  s3://p11-fruits-pipeline/raw/fruits/fruits-360...  ...  [-10.37006167936477, 4.893478026537261, -4.993...
2  s3://p11-fruits-pipeline/raw/fruits/fruits-360...  ...  [-15.656754851939636, 3.5869597611761126, -6.8...
3  s3://p11-fruits-pipeline/raw/fruits/fruits-360...  ...  [-5.738487898637016, 4.796668166982608, 3.5587...
4  s3://p11-fruits-pipeline/raw/fruits/fruits-360...  ...  [-12.01986290191231, 3.311426517836176, -5.628...

[5 rows x 4 columns]

In [31]:
df.loc[0,'pca_features'].shape

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(983,)

In [30]:
df.shape

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(22688, 4)