## 1 Initialisation

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1696757650813_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 2 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

## 3 Import des librairies

In [29]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.functions import vector_to_array


import matplotlib.pyplot as plt

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 4 Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [3]:
PATH = 's3://p8-data-yr'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://p8-data-yr
PATH_Data:   s3://p8-data-yr/Test
PATH_Result: s3://p8-data-yr/Results

## 5 Traitement des données

### 5.1 Chargement des données

In [7]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
print(images.printSchema())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)

None

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [9]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+-------------------------------------------------+--------------+
|path                                             |label         |
+-------------------------------------------------+--------------+
|s3://p8-data-yr/Test/Apple Braeburn/r_326_100.jpg|Apple Braeburn|
|s3://p8-data-yr/Test/Apple Braeburn/r_4_100.jpg  |Apple Braeburn|
|s3://p8-data-yr/Test/Apple Braeburn/r_8_100.jpg  |Apple Braeburn|
|s3://p8-data-yr/Test/Apple Braeburn/r_324_100.jpg|Apple Braeburn|
|s3://p8-data-yr/Test/Apple Braeburn/r_327_100.jpg|Apple Braeburn|
+-------------------------------------------------+--------------+
only showing top 5 rows

None

### 5.2 Préparation du modèle

In [10]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

In [13]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 5.3 Définition du processus de chargement des images et application <br/>de leur featurisation à travers l'utilisation de pandas UDF

In [15]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



### 5.4 Exécution des actions d'extraction de features

In [16]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://p8-data-yr/Results

In [18]:
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 5.5 Chargement des données enregistrées et validation du résultat

In [19]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                               path  ...                                           features
0     s3://p8-data-yr/Test/Apple Red 3/r_94_100.jpg  ...  [0.9351357, 0.124995366, 0.0, 0.0, 0.12237616,...
1       s3://p8-data-yr/Test/Apple Red 3/66_100.jpg  ...  [0.9927635, 0.0031073727, 0.0015551314, 0.0, 0...
2             s3://p8-data-yr/Test/Guava/67_100.jpg  ...  [0.6223958, 0.0, 0.014646112, 0.0, 0.0, 0.0, 0...
3  s3://p8-data-yr/Test/Apple Braeburn/r_83_100.jpg  ...  [1.5073447, 0.17095171, 0.0, 0.0, 0.20871237, ...
4             s3://p8-data-yr/Test/Guava/63_100.jpg  ...  [0.7219886, 0.0, 0.0, 0.0, 0.0, 0.0, 0.4469454...

[5 rows x 3 columns]

In [21]:
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

## 6 PCA

### 6.1 Transformation de l'array en vecteur 

In [22]:
array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
vectorized_df = features_df.withColumn('vectors', array_to_vector_udf('features'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
vectorized_df.show(6)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+--------------------+
|                path|         label|            features|             vectors|
+--------------------+--------------+--------------------+--------------------+
|s3://p8-data-yr/T...|Apple Braeburn|[0.013541504, 0.0...|[0.01354150380939...|
|s3://p8-data-yr/T...|Apple Braeburn|[0.8709561, 0.005...|[0.87095612287521...|
|s3://p8-data-yr/T...|   Apple Red 3|[0.5668191, 0.256...|[0.56681907176971...|
|s3://p8-data-yr/T...|   Apple Red 3|[1.2215025, 1.714...|[1.22150254249572...|
|s3://p8-data-yr/T...|   Apple Red 3|[1.4597661, 0.502...|[1.45976614952087...|
|s3://p8-data-yr/T...|         Guava|[0.21388806, 0.0,...|[0.21388806402683...|
+--------------------+--------------+--------------------+--------------------+
only showing top 6 rows

### 6.2 Normalisation des données

In [25]:
scaler = StandardScaler(
    inputCol = 'vectors', 
    outputCol = 'scaledFeatures',
    withMean = True,
    withStd = True
).fit(vectorized_df)

# when we transform the dataframe, the old
# feature will still remain in it
df_scaled = scaler.transform(vectorized_df)
df_scaled.show(6)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+--------------------+--------------------+
|                path|         label|            features|             vectors|      scaledFeatures|
+--------------------+--------------+--------------------+--------------------+--------------------+
|s3://p8-data-yr/T...|Apple Braeburn|[0.013541504, 0.0...|[0.01354150380939...|[-1.0143140882551...|
|s3://p8-data-yr/T...|Apple Braeburn|[0.8709561, 0.005...|[0.87095612287521...|[0.86355287712091...|
|s3://p8-data-yr/T...|   Apple Red 3|[0.5668191, 0.256...|[0.56681907176971...|[0.19744699495135...|
|s3://p8-data-yr/T...|   Apple Red 3|[1.2215025, 1.714...|[1.22150254249572...|[1.63130225600768...|
|s3://p8-data-yr/T...|   Apple Red 3|[0.75695336, 0.18...|[0.75695335865020...|[0.61386967818858...|
|s3://p8-data-yr/T...|   Apple Red 3|[1.1464746, 0.716...|[1.14647459983825...|[1.46697977786974...|
+--------------------+--------------+--------------------+--------------------+------------

### 6.3 Application de l'ACP

In [30]:
n_components = 2
pca = PCA(
    k = n_components, 
    inputCol = 'scaledFeatures', 
    outputCol = 'pcaFeatures'
).fit(df_scaled)

df_pca = pca.transform(df_scaled)
print('Explained Variance Ratio', pca.explainedVariance.toArray())
df_pca.show(6)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Explained Variance Ratio [0.15644731 0.11537846]
+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+
|                path|      label|            features|             vectors|      scaledFeatures|         pcaFeatures|
+--------------------+-----------+--------------------+--------------------+--------------------+--------------------+
|s3://p8-data-yr/T...|Apple Red 3|[0.9351357, 0.124...|[0.93513572216033...|[1.00411585175263...|[0.24858413784830...|
|s3://p8-data-yr/T...|Apple Red 3|[0.9927635, 0.003...|[0.99276351928710...|[1.13032939430881...|[-3.1836015923725...|
|s3://p8-data-yr/T...|Apple Red 3|[0.75695336, 0.18...|[0.75695335865020...|[0.61386967818858...|[-2.4035160659529...|
|s3://p8-data-yr/T...|Apple Red 3|[1.1464746, 0.716...|[1.14647459983825...|[1.46697977786974...|[1.79999031958972...|
|s3://p8-data-yr/T...|      Guava|[0.24572544, 0.0,...|[0.24572543799877...|[-0.5057963534160...|[7.87038328303009...|

In [31]:
pca_results = df_pca.withColumn("pca", vector_to_array("pcaFeatures")).select(["path", "label"] + [col("pca")[i] for i in range(2)])
pca_results.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------+-------------------+-------------------+
|                path|      label|             pca[0]|             pca[1]|
+--------------------+-----------+-------------------+-------------------+
|s3://p8-data-yr/T...|Apple Red 3|0.24858413784830957| -12.50481423807145|
|s3://p8-data-yr/T...|Apple Red 3|-3.1836015923725225|  -8.22500981040577|
|s3://p8-data-yr/T...|Apple Red 3| -2.403516065952976|-10.219838159888889|
|s3://p8-data-yr/T...|Apple Red 3|  1.799990319589725| -9.259526906684512|
|s3://p8-data-yr/T...|      Guava|  7.870383283030091|-23.139189777230474|
+--------------------+-----------+-------------------+-------------------+
only showing top 5 rows

### 6.4 Ecritures des résultats dans S3

In [32]:
pca_results.write.mode("overwrite").csv(PATH + '/PCA_result')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…