### 1.1 Démarrage de la session Spark

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1680715667470_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1680715667470_0001,pyspark,idle,Link,Link,,✔


### 1.2 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

### 1.3 Import des librairies

In [1]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

ModuleNotFoundError: No module named 'tensorflow'

### 1.4 Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [4]:
PATH = 's3://emanuelepartenza-projet8'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://emanuelepartenza-projet8
PATH_Data:   s3://emanuelepartenza-projet8/Test
PATH_Result: s3://emanuelepartenza-projet8/Results

### 1.5 Traitement des données

#### 1.5.1 Chargement des données

In [5]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://emanuelepart...|2023-04-05 10:32:11|  6328|[FF D8 FF E0 00 1...|
|s3://emanuelepart...|2023-04-05 10:32:14|  6322|[FF D8 FF E0 00 1...|
|s3://emanuelepart...|2023-04-05 10:32:05|  6308|[FF D8 FF E0 00 1...|
|s3://emanuelepart...|2023-04-05 10:31:10|  6304|[FF D8 FF E0 00 1...|
|s3://emanuelepart...|2023-04-05 10:31:56|  6300|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [7]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+-------------------------------------------------------------+--------------+
|path                                                         |label         |
+-------------------------------------------------------------+--------------+
|s3://emanuelepartenza-projet8/Test/Apple Golden 1/114_100.jpg|Apple Golden 1|
|s3://emanuelepartenza-projet8/Test/Apple Golden 1/103_100.jpg|Apple Golden 1|
|s3://emanuelepartenza-projet8/Test/Apple Golden 1/101_100.jpg|Apple Golden 1|
|s3://emanuelepartenza-projet8/Test/Apple Golden 1/96_100.jpg |Apple Golden 1|
|s3://emanuelepartenza-projet8/Test/Apple Golden 1/100_100.jpg|Apple Golden 1|
+-------------------------------------------------------------+--------------+
only showing top 5 rows

None

#### 1.5.2 Préparation du modèle

In [8]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [9]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

In [12]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.5.3 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [13]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



#### 1.5.4 Exécutions des actions d'extractions de features

In [14]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 1.5.5 PCA

In [21]:
import time
from pyspark.sql.functions import udf
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
def pca_transformation(df, n_components=50, col_image='features'):
    
    """
    Applique un algorithme de PCA sur l'ensemble des images pour réduire la dimension de chaque image 
    du jeu de données.
    
    Paramètres:
    df(pyspark dataFrame): contient une colonne avec les données images
    n_components(int): nombre de dimensions à conserver
    col_image(string): nom de la colonne où récupérer les données images
    """

    # Initilisation du temps de calcul
    start_time = time.time()

    # Les données images sont converties au format vecteur dense
    ud_f = udf(lambda r: Vectors.dense(r), VectorUDT())
    df = df.withColumn('features', ud_f('features'))
    
    standardizer = StandardScaler(inputCol="features", outputCol="scaledFeatures",
                                  withStd=True, withMean=True)
    model_std = standardizer.fit(df)
    df = model_std.transform(df)

    # Entrainement de l'algorithme
    pca = PCA(k=n_components, inputCol='scaledFeatures', outputCol='pcaFeatures')
    model_pca = pca.fit(df)

    # Transformation des images sur les k premières composantes
    df = model_pca.transform(df)

    df = df.filter(df.pcaFeatures.isNotNull())
    
    # Affiche le temps de calcul
    print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))

    return df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
pca_df = pca_transformation(features_df)
pca_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Temps d'execution 524.02 secondes
+--------------------+--------------+--------------------+--------------------+--------------------+
|                path|         label|            features|      scaledFeatures|         pcaFeatures|
+--------------------+--------------+--------------------+--------------------+--------------------+
|s3://emanuelepart...|Apple Golden 1|[0.0,0.0104257911...|[-0.8562760789128...|[7.34856954418275...|
|s3://emanuelepart...|Apple Golden 1|[0.0,0.0834322348...|[-0.8562760789128...|[6.5955599854301,...|
|s3://emanuelepart...|Apple Golden 1|[0.01090431213378...|[-0.8360288905009...|[4.43380727582671...|
|s3://emanuelepart...|  Cantaloupe 2|[0.34131395816802...|[-0.2225223919461...|[6.96026744806433...|
|s3://emanuelepart...|  Cantaloupe 2|[0.01884618028998...|[-0.8212823848133...|[3.51006709241068...|
+--------------------+--------------+--------------------+--------------------+--------------------+
only showing top 5 rows

In [24]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://emanuelepartenza-projet8/Results

In [25]:
pca_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 1.6 Chargement des données enregistrées et validation du résultat

In [26]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                        pcaFeatures
0  s3://emanuelepartenza-projet8/Test/Apple Golde...  ...  {'type': 1, 'size': None, 'indices': None, 'va...
1  s3://emanuelepartenza-projet8/Test/Apple Golde...  ...  {'type': 1, 'size': None, 'indices': None, 'va...
2  s3://emanuelepartenza-projet8/Test/Apple Golde...  ...  {'type': 1, 'size': None, 'indices': None, 'va...
3  s3://emanuelepartenza-projet8/Test/Cantaloupe ...  ...  {'type': 1, 'size': None, 'indices': None, 'va...
4  s3://emanuelepartenza-projet8/Test/Cantaloupe ...  ...  {'type': 1, 'size': None, 'indices': None, 'va...

[5 rows x 5 columns]

In [29]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(4055, 5)

In [38]:
df.to_csv('s3://emanuelepartenza-projet8/Partenza_Emanuele_2_images_042023.csv')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
import pandas as pd

In [4]:
df = pd.read_csv('Partenza_Emanuele_2_images_032023.csv')

In [5]:
df

Unnamed: 0.1,Unnamed: 0,path,label,features,scaledFeatures,pcaFeatures
0,0,s3://emanuelepartenza-projet8/Test/Apple Golde...,Apple Golden 1,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
1,1,s3://emanuelepartenza-projet8/Test/Apple Golde...,Apple Golden 1,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
2,2,s3://emanuelepartenza-projet8/Test/Apple Golde...,Apple Golden 1,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
3,3,s3://emanuelepartenza-projet8/Test/Cantaloupe ...,Cantaloupe 2,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
4,4,s3://emanuelepartenza-projet8/Test/Cantaloupe ...,Cantaloupe 2,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
...,...,...,...,...,...,...
4050,4050,s3://emanuelepartenza-projet8/Test/Banana Red/...,Banana Red,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
4051,4051,s3://emanuelepartenza-projet8/Test/Banana Lady...,Banana Lady Finger,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
4052,4052,s3://emanuelepartenza-projet8/Test/Banana/r_23...,Banana,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
4053,4053,s3://emanuelepartenza-projet8/Test/Banana/r_49...,Banana,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
