## Démarrage de la session Spark

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1698654140027_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Affichage des informations sur la session en cours et liens vers Spark UI

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1698654140027_0001,pyspark,idle,Link,Link,✔


## Import des librairies

In [3]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA, StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Définition des PATH pour charger les images et enregistrer les résultats

In [4]:
PATH = 's3://ypv-p8-data'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://ypv-p8-data
PATH_Data:   s3://ypv-p8-data/Test
PATH_Result: s3://ypv-p8-data/Results

## Traitement des données
### Chargement des données

In [5]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://ypv-p8-data/...|2023-10-26 11:50:37|  4893|[FF D8 FF E0 00 1...|
|s3://ypv-p8-data/...|2023-10-26 11:50:37|  4875|[FF D8 FF E0 00 1...|
|s3://ypv-p8-data/...|2023-10-26 11:50:37|  4841|[FF D8 FF E0 00 1...|
|s3://ypv-p8-data/...|2023-10-26 11:50:36|  4830|[FF D8 FF E0 00 1...|
|s3://ypv-p8-data/...|2023-10-26 11:50:36|  4371|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

In [7]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+------------------------------------------+-------+
|path                                      |label  |
+------------------------------------------+-------+
|s3://ypv-p8-data/Test/Avocado/r_4_100.jpg |Avocado|
|s3://ypv-p8-data/Test/Avocado/r_5_100.jpg |Avocado|
|s3://ypv-p8-data/Test/Avocado/r_41_100.jpg|Avocado|
|s3://ypv-p8-data/Test/Avocado/r_40_100.jpg|Avocado|
|s3://ypv-p8-data/Test/Avocado/87_100.jpg  |Avocado|
+------------------------------------------+-------+
only showing top 5 rows

None

### Préparation du modèle

In [8]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [9]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [12]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Featurisation

In [13]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



In [14]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
features_df.dtypes

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('path', 'string'), ('label', 'string'), ('features', 'array<float>')]

In [16]:
features_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+--------------------+
|                path|  label|            features|
+--------------------+-------+--------------------+
|s3://ypv-p8-data/...|Avocado|[0.43423668, 0.0,...|
|s3://ypv-p8-data/...|Avocado|[0.84571016, 0.0,...|
|s3://ypv-p8-data/...|Avocado|[0.3558726, 0.0, ...|
|s3://ypv-p8-data/...|Avocado|[0.08634439, 0.0,...|
|s3://ypv-p8-data/...|Avocado|[1.3703405, 0.0, ...|
|s3://ypv-p8-data/...|Avocado|[0.8746656, 0.0, ...|
|s3://ypv-p8-data/...|Avocado|[0.6303431, 0.0, ...|
|s3://ypv-p8-data/...|Avocado|[0.36890134, 0.0,...|
|s3://ypv-p8-data/...|Apricot|[0.12042749, 0.24...|
|s3://ypv-p8-data/...|Avocado|[0.8335934, 0.0, ...|
|s3://ypv-p8-data/...|Apricot|[0.2801924, 0.180...|
|s3://ypv-p8-data/...|Avocado|[0.27026844, 0.0,...|
|s3://ypv-p8-data/...|Apricot|[0.22149888, 0.30...|
|s3://ypv-p8-data/...|Apricot|[0.30879328, 0.25...|
|s3://ypv-p8-data/...|Apricot|[0.18892598, 0.25...|
|s3://ypv-p8-data/...|Apricot|[0.22485153, 0.38...|
|s3://ypv-p8

## Transformation de l'array des features en Vector

In [17]:
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
df_with_vectors = features_df.select(features_df["path"],
                                     features_df["label"],
                                     list_to_vector_udf(features_df["features"]).alias("features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
df_with_vectors.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+--------------------+
|                path|  label|            features|
+--------------------+-------+--------------------+
|s3://ypv-p8-data/...|Avocado|[0.43423667550086...|
|s3://ypv-p8-data/...|Avocado|[0.84571015834808...|
|s3://ypv-p8-data/...|Avocado|[0.35587260127067...|
|s3://ypv-p8-data/...|Avocado|[0.08634439110755...|
|s3://ypv-p8-data/...|Avocado|[1.37034046649932...|
|s3://ypv-p8-data/...|Avocado|[0.87466561794281...|
|s3://ypv-p8-data/...|Avocado|[0.63034307956695...|
|s3://ypv-p8-data/...|Avocado|[0.36890134215354...|
|s3://ypv-p8-data/...|Apricot|[0.12042748928070...|
|s3://ypv-p8-data/...|Avocado|[0.83359342813491...|
|s3://ypv-p8-data/...|Apricot|[0.28019240498542...|
|s3://ypv-p8-data/...|Avocado|[0.27026844024658...|
|s3://ypv-p8-data/...|Apricot|[0.22149887681007...|
|s3://ypv-p8-data/...|Apricot|[0.30879327654838...|
|s3://ypv-p8-data/...|Apricot|[0.18892598152160...|
|s3://ypv-p8-data/...|Apricot|[0.22485153377056...|
|s3://ypv-p8

## Standardisation

In [20]:
scaler = StandardScaler(
    inputCol = 'features',
    outputCol = 'scaledFeatures',
    withMean = True,
    withStd = True
).fit(df_with_vectors)

df_scaled = scaler.transform(df_with_vectors)
df_scaled.show(6)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+--------------------+--------------------+
|                path|  label|            features|      scaledFeatures|
+--------------------+-------+--------------------+--------------------+
|s3://ypv-p8-data/...|Avocado|[0.43423667550086...|[-0.0338907992986...|
|s3://ypv-p8-data/...|Avocado|[0.84571015834808...|[1.24459169052246...|
|s3://ypv-p8-data/...|Avocado|[0.35587260127067...|[-0.2773745252512...|
|s3://ypv-p8-data/...|Avocado|[0.08634439110755...|[-1.1148211933135...|
|s3://ypv-p8-data/...|Avocado|[1.37034046649932...|[2.87466189081741...|
|s3://ypv-p8-data/...|Avocado|[0.87466561794281...|[1.33455872271230...|
+--------------------+-------+--------------------+--------------------+
only showing top 6 rows

## Réduction de dimension

In [21]:
pca = PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures")
model = pca.fit(df_scaled)
result = model.transform(df_scaled)
print('Explained Variance Ratio', model.explainedVariance.toArray())
result.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Explained Variance Ratio [0.34720323 0.17626962]
+--------------------+-------+--------------------+--------------------+--------------------+
|                path|  label|            features|      scaledFeatures|         pcaFeatures|
+--------------------+-------+--------------------+--------------------+--------------------+
|s3://ypv-p8-data/...|Avocado|[0.43423667550086...|[-0.0338907992986...|[-12.422524818156...|
|s3://ypv-p8-data/...|Avocado|[0.84571015834808...|[1.24459169052246...|[-23.330534821401...|
|s3://ypv-p8-data/...|Avocado|[0.35587260127067...|[-0.2773745252512...|[-28.621932378135...|
|s3://ypv-p8-data/...|Avocado|[0.08634439110755...|[-1.1148211933135...|[-30.111787551363...|
|s3://ypv-p8-data/...|Avocado|[1.37034046649932...|[2.87466189081741...|[-26.436692392112...|
|s3://ypv-p8-data/...|Avocado|[0.87466561794281...|[1.33455872271230...|[-14.022579001952...|
|s3://ypv-p8-data/...|Avocado|[0.63034307956695...|[0.57542818282728...|[-13.775099661257...|
|s3://ypv-p

### Rappel du PATH où seront inscrits les fichiers au format "parquet" :

In [22]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://ypv-p8-data/Results

### Enregistrement des données traitées au format "parquet" :

In [23]:
result.write.mode("overwrite").parquet(PATH_Result+'/parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Enregistrement des données traitées au format "csv" :

In [24]:
pca_df = result.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
pca_df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                          path  ...                                 pcaFeatures
0     s3://ypv-p8-data/Test/Avocado/87_100.jpg  ...  [-12.422524818156083, -19.146926371320134]
1   s3://ypv-p8-data/Test/Avocado/r_41_100.jpg  ...    [-23.33053482140182, 11.092559380018086]
2    s3://ypv-p8-data/Test/Avocado/r_4_100.jpg  ...    [-28.621932378135273, 26.91890073666033]
3    s3://ypv-p8-data/Test/Avocado/r_5_100.jpg  ...   [-30.111787551363044, 25.415953797713584]
4   s3://ypv-p8-data/Test/Avocado/r_40_100.jpg  ...   [-26.436692392112214, 13.606500424376733]
5     s3://ypv-p8-data/Test/Avocado/74_100.jpg  ...  [-14.022579001952998, -15.011523002247296]
6     s3://ypv-p8-data/Test/Avocado/88_100.jpg  ...  [-13.775099661257144, -21.155488091799715]
7     s3://ypv-p8-data/Test/Avocado/60_100.jpg  ...   [-14.140202571470574, -20.39238612025534]
8     s3://ypv-p8-data/Test/Avocado/73_100.jpg  ...  [-12.797352595869281, -16.372893868026587]
9     s3://ypv-p8-data/Test/Apricot/44_1

In [31]:
pca_df.to_csv(PATH_Result+'/'+'result.csv')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Chargement des données enregistrées et validation du résultat

In [27]:
dfs = spark.read.parquet(PATH_Result+'/parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
df = dfs.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## On affiche les 5 premières lignes du DataFrame

In [29]:
df.head(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                         path  ...                                 pcaFeatures
0    s3://ypv-p8-data/Test/Avocado/73_100.jpg  ...  [-12.797352595869281, -16.372893868026587]
1    s3://ypv-p8-data/Test/Apricot/44_100.jpg  ...      [18.114101461294062, 2.37903271052807]
2    s3://ypv-p8-data/Test/Apricot/48_100.jpg  ...    [15.838143903502543, 1.7954677915495234]
3    s3://ypv-p8-data/Test/Avocado/59_100.jpg  ...  [-13.454795548110424, -18.779111208469562]
4  s3://ypv-p8-data/Test/Avocado/r_40_100.jpg  ...   [-26.436692392112214, 13.606500424376733]

[5 rows x 5 columns]

### On vérifie la taille du vecteur de caractéristiques des images après réduction à 2 dimensions

In [30]:
df.loc[0,'pcaFeatures'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(2,)