### 4.10.1 Démarrage de la session Spark

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1713722877457_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1713722877457_0003,pyspark,idle,Link,Link,,✔


### 4.10.2 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.
```bash
#!/bin/bash
# install requirements on all cluster nodes, not only master
# update pip to prevent pyarrow installation error
sudo python3 -m pip install -U setuptools
sudo python3 -m pip install -U pip
sudo python3 -m pip install wheel
sudo python3 -m pip install pillow
# same version of tensorflow and keras to avoid module not found error
sudo python3 -m pip install tensorflow==2.11.0
sudo python3 -m pip install keras==2.11.0
sudo python3 -m pip install pandas==1.2.5
sudo python3 -m pip install pyarrow
sudo python3 -m pip install boto3
sudo python3 -m pip install s3fs
sudo python3 -m pip install fsspec

```

### 4.10.3 Import des librairies

In [3]:
import io

import numpy as np
import pandas as pd
from PIL import Image
from pyspark.ml.feature import PCA, StandardScaler, VectorAssembler
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import PandasUDFType, col, element_at, pandas_udf, split, udf
from pyspark.sql.types import *
from tensorflow.keras import Model
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.10.4 Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [4]:
# define s3 path for reading data and writing results
BUCKET = "s3://carl-p8-v2"
PATH_Data = BUCKET + "/sample_test"
PATH_Result = BUCKET + "/Results"

print(
    "PATH:        "
    + BUCKET
    + "\nPATH_Data:   "
    + PATH_Data
    + "\nPATH_Result: "
    + PATH_Result
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://carl-p8-v2
PATH_Data:   s3://carl-p8-v2/sample_test
PATH_Result: s3://carl-p8-v2/Results

### 4.10.5 Traitement des données

#### 4.10.5.1 Chargement des données

In [5]:
# Load images using Spark
images = (
    spark.read.format("binaryFile")
    .option("pathGlobFilter", "*.jpg")
    .option("recursiveFileLookup", "true")
    .load(PATH_Data)
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
print((images.count(), len(images.columns)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [8]:
# keep only path and add label column
images = images.withColumn("label", element_at(split(images["path"], "/"), -2))
print(images.printSchema())
print(images.select("path", "label").show(5, False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
images.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 4.10.5.2 Préparation du modèle

In [10]:
# use MobileNetV2 to extract features
model = MobileNetV2(weights="imagenet", include_top=True, input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# remove top layer
new_model = Model(inputs=model.input, outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
# Broadcast the model's weights to the workers to ensure that they are always available.
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

In [14]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights="imagenet", include_top=True, input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input, outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 4.10.5.3 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [15]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)


def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)


@pandas_udf("array<float>", PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    """
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    """
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



#### 4.10.5.4 Exécutions des actions d'extractions de features

In [16]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
images.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [39]:
# partition the images DataFrame to 4 partitions
# apply the UDF to the images DataFrame
features_df = images.repartition(4).select(
    col("path"), col("label"), featurize_udf("content").alias("features")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
type(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<class 'pyspark.sql.dataframe.DataFrame'>

In [19]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://carl-p8-v2/Results

In [20]:
# save the result to parquet
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# features_df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(path='s3://carl-p8-v2/sample_test/Cantaloupe 1/r_124_100.jpg', label='Cantaloupe 1', features=[0.01395356934517622, 0.0, 0.0, 0.0, 0.0, 0.5506893992424011, 0.11148536205291748, 0.0, 0.0813228189945221, 0.0, 0.0, 0.028242111206054688, 0.0, 0.0, 0.027744263410568237, 2.2419822216033936, 0.0, 0.026646282523870468, 0.03873852267861366, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.007424850948154926, 0.3810492157936096, 0.22255054116249084, 0.0, 0.0, 0.0, 0.0, 0.0, 0.5949232578277588, 0.9085151553153992, 3.4851419925689697, 0.0, 0.0, 0.0, 0.0, 0.0, 0.05613243207335472, 0.0, 0.36973631381988525, 1.578869104385376, 0.09867190569639206, 0.0, 0.004996591713279486, 0.7225787043571472, 0.0, 0.0, 0.0, 1.180759072303772, 0.15176866948604584, 0.0, 0.0, 0.0, 0.0, 0.06988798081874847, 0.0006442167214117944, 0.0, 0.22822539508342743, 0.023661185055971146, 0.0015062701422721148, 0.0026557494420558214, 0.0, 0.7860575318336487, 0.0, 0.0, 0.0, 0.0022411346435546875, 1.2454360723495483, 0.0, 0.0, 0.0,

### 4.10.6 Chargement des données enregistrées et validation du résultat

In [25]:
# read the result back
df = pd.read_parquet(PATH_Result, engine="pyarrow")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [36]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                           features
0  s3://carl-p8-v2/sample_test/Strawberry/327_100...  ...  [1.4755306, 0.0, 0.0, 0.0, 0.0, 0.3159332, 1.7...
1  s3://carl-p8-v2/sample_test/Apple Braeburn/r_9...  ...  [0.0, 0.006487224, 0.0, 0.0, 0.0, 0.5786728, 0...
2  s3://carl-p8-v2/sample_test/Apple Braeburn/r_4...  ...  [0.70797396, 0.051301703, 0.0, 0.0, 0.0, 0.447...
3  s3://carl-p8-v2/sample_test/Apple Braeburn/r_9...  ...  [0.9978231, 0.037637126, 0.0, 0.0, 0.0, 1.2061...
4  s3://carl-p8-v2/sample_test/Apple Braeburn/r_8...  ...  [1.4039954, 0.19252859, 0.0, 0.0, 0.03644992, ...

[5 rows x 3 columns]

In [37]:
df.loc[0, "features"].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [38]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(494, 3)

# 5. PCA

In [40]:
# First, convert the array of floats to a single vector column
# assembler = VectorAssembler(inputCols=["features"], outputCol="features_vector")
# features_df = assembler.transform(features_df)

# Now, drop the original features columns and rename the new vector column
# features_df = features_df.drop(*features_cols).withColumnRenamed("features_vector", "features")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [32]:
# from pyspark.ml.feature import VectorAssembler

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
# VectorAssembler(inputCols=["features"], outputCol="features_vector")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
# "features" column created by featurize_udf is of type ArrayType
# pyspark.mk needs a VectorUDT instead

# Define UDF to convert ArrayType to VectorUDT
array_to_vector_udf = udf(lambda vs: Vectors.dense(vs), VectorUDT())

# Apply the UDF to the "features" column
features_df = features_df.withColumn("features", array_to_vector_udf("features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
# scale the data
scaler = StandardScaler(
    inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=True
)
scalerModel = scaler.fit(features_df)
scaledData = scalerModel.transform(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [45]:
# apply PCA
N_PC = 100
pca = PCA(k=N_PC, inputCol="scaledFeatures", outputCol="pcaFeatures")
pcaModel = pca.fit(scaledData)
pcaData = pcaModel.transform(scaledData).select("pcaFeatures")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [51]:
# print((pcaData.count(), len(pcaData.columns)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(494, 1)

In [None]:
expVariance = pcaModel.explainedVariance
# print(expVariance)
# pd.Series(v for v in expVariance).cumsum()

# total explained Variance
print(
    f"{N_PC} principal components explain {pd.Series(v for v in expVariance).sum()} % of the variance"
)

In [52]:
# pcaData.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Row(pcaFeatures=DenseVector([-11.2006, 12.2269]))

# 6. Save PCA results

In [None]:
# Define a function to split the list into multiple columns
def split_list_to_columns(lst):
    return tuple(lst)


# Register the UDF
split_list_udf = udf(split_list_to_columns, ArrayType(IntegerType()))

# Apply the UDF to the DataFrame column containing the list
pca_split = pcaData.withColumn("pcaFeatures", split_list_udf("col"))

# Select the split columns
pca_split = pca_split.select(
    *[col("pcaFeatures").getItem(i).alias(f"col{i+1}") for i in range(N_PC)]
)

# Show the DataFrame with split columns
pca_split.show()

In [None]:
# concat pca features to image dataframe
results_concat = images.join(pca_split)

In [46]:
# save PCA result to csv
results_concat.toPandas().to_csv(PATH_Result + "/pcaFeatures_v2.csv")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…