# Modélisation

## Démarrage de la session Spark

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1712058004734_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1712058004734_0001,pyspark,idle,Link,Link,,✔


## Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

## Import des librairies

In [4]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from keras.preprocessing.image import img_to_array
from keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [5]:
PATH = 's3://ocangrand-p9-data'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://ocangrand-p9-data
PATH_Data:   s3://ocangrand-p9-data/Test
PATH_Result: s3://ocangrand-p9-data/Results

## Traitement des données

### Chargement des données

In [6]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://ocangrand-p9...|2024-03-28 13:57:47|  7353|[FF D8 FF E0 00 1...|
|s3://ocangrand-p9...|2024-03-28 13:57:47|  7350|[FF D8 FF E0 00 1...|
|s3://ocangrand-p9...|2024-03-28 13:57:47|  7349|[FF D8 FF E0 00 1...|
|s3://ocangrand-p9...|2024-03-28 13:57:47|  7348|[FF D8 FF E0 00 1...|
|s3://ocangrand-p9...|2024-03-28 13:57:48|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [8]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+----------------------------------------------------+----------+
|path                                                |label     |
+----------------------------------------------------+----------+
|s3://ocangrand-p9-data/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://ocangrand-p9-data/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://ocangrand-p9-data/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://ocangrand-p9-data/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://ocangrand-p9-data/Test/Watermelon/r_95_100.jpg |Watermelon|
+----------------------------------------------------+----------+
only showing top 5 rows

None

### Préparation du modèle

In [9]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5
[1m       0/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m0s[0m 0s/step[1m   49152/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m35s[0m 2us/step[1m   81920/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m40s[0m 3us/step[1m  147456/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m27s[0m 2us/step[1m  212992/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m22s[0m 2us/step[1m  278528/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m19s[0m 1us/step[1m  385024/14536120[0m [37m━━━━━━━━━━━━━━━━━━━━[0m [1m16s[0m 1us/step

In [10]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "functional_1"
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓
┃ Layer (type)        ┃ Output Shape      ┃    Param # ┃ Connected to      ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩
│ input_layer         │ (None, 224, 224,  │          0 │ -                 │
│ (InputLayer)        │ 3)                │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ Conv1 (Conv2D)      │ (None, 112, 112,  │        864 │ input_layer[0][0] │
│                     │ 32)               │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ bn_Conv1            │ (None, 112, 112,  │        128 │ Conv1[0][0]       │
│ (BatchNormalizatio… │ 32)               │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ Conv1_relu (ReLU)   │ (None, 112, 112,  │          0

In [13]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [14]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



### Exécutions des actions d'extractions de features

In [14]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://ocangrand-p9-data/Results

===============================

## Réduction de dimensions via PCA

In [17]:
from pyspark.ml.feature import PCA
from pyspark.ml.functions import array_to_vector, vector_to_array
from pyspark.sql.types import ArrayType, FloatType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Transformation de la colonne 'features' de array à vector pour être utilisable dans le pca.  
On va utiliser plusieurs voir le dataframe features_df, donc je fais un persist pour gagner du temps sur les futurs appels.

In [18]:
features_df = features_df.withColumn('features', array_to_vector('features'))
features_df.persist()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[path: string, label: string, features: vector]

In [19]:
pca = PCA(k=1280, inputCol='features', outputCol='reduc_features')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
df_pca = pca.fit(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Je calcule la variance expliquée cumulée pour choisir mon k qui fera en sorte que mon PCA garde 99% des informations.

In [21]:
cumValues = df_pca.explainedVariance.cumsum()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
for x in range(0,1280):
    variance = cumValues[x]
    if variance>0.99:
        num_comp = x
        break

print(num_comp)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

772

In [23]:
pca2 = PCA(k=num_comp, inputCol='features', outputCol='reduc_features')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
df_pca2 = pca2.fit(features_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

J'applique la transformation, je reconvertis le type de mes features réduites de vector à array et je supprime les anciennes features désormais inutiles.

In [25]:
features_df_reduc = df_pca2.transform(features_df)
features_df_reduc = features_df_reduc.withColumn('reduc_features', vector_to_array('reduc_features', dtype='float32'))
features_df_reduc = features_df_reduc.drop('features')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

===================================================================

On écrit le résultat dans un fichier au format parquet, donc segmenté en plusieurs partitions.

In [26]:
features_df_reduc.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.10.6 Chargement des données enregistrées et validation du résultat

In [27]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [30]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(22688, 3)

In [35]:
df.columns

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Index(['path', 'label', 'reduc_features'], dtype='object')

In [28]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                     reduc_features
0  s3://ocangrand-p9-data/Test/Watermelon/r_89_10...  ...  [-2.604326, 5.5291176, -5.521852, -3.459821, 8...
1  s3://ocangrand-p9-data/Test/Watermelon/r_98_10...  ...  [-1.9239061, 4.3120093, -5.238205, -2.8958635,...
2  s3://ocangrand-p9-data/Test/Watermelon/r_48_10...  ...  [-1.6070489, 5.0849524, -7.463815, -4.9713793,...
3  s3://ocangrand-p9-data/Test/Pineapple Mini/53_...  ...  [-5.924988, 3.3677704, 1.8358918, -4.0382013, ...
4  s3://ocangrand-p9-data/Test/Pineapple Mini/59_...  ...  [-5.8269, 3.7045207, 1.9708878, -4.48328, -2.3...

[5 rows x 3 columns]

In [29]:
df.loc[0,'reduc_features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(772,)