### 1. Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

### 2. Import des librairies & initialisation de la SparkSession

In [1]:
import pandas as pd
import PIL
import numpy as np
import io
import os
from PIL import Image

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.sql import SparkSession

from pyspark.ml.feature import VectorAssembler, PCA
from pyspark.ml.linalg import Vectors, VectorUDT 

2024-07-13 18:19:10.960246: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-07-13 18:19:10.960910: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-07-13 18:19:10.963398: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-07-13 18:19:10.971800: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:479] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-13 18:19:10.989088: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:10575] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registe

In [2]:
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)

24/07/13 18:19:16 WARN Utils: Your hostname, DESKTOP-BTJINQV resolves to a loopback address: 127.0.1.1; using 172.18.63.118 instead (on interface eth0)
24/07/13 18:19:16 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/13 18:19:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/07/13 18:19:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
sc = spark.sparkContext

In [4]:
spark

### 3. Définition des PATH pour charger les images et enregistrer les résultats

In [5]:
PATH = os.getcwd()
PATH_Data = PATH+'/data/Test1'
PATH_Result = PATH+'/data/Results'
PATH_PCA = PATH+'/data/Results_PCA'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        /home/benoit-pr/Projet9
PATH_Data:   /home/benoit-pr/Projet9/data/Test1
PATH_Result: /home/benoit-pr/Projet9/data/Results


### 4.Traitement des données

In [6]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

In [7]:
images.show(5)

+--------------------+--------------------+------+--------------------+
|                path|    modificationTime|length|             content|
+--------------------+--------------------+------+--------------------+
|file:/home/benoit...|2024-06-26 16:23:...|  6193|[FF D8 FF E0 00 1...|
|file:/home/benoit...|2024-06-26 16:23:...|  6177|[FF D8 FF E0 00 1...|
|file:/home/benoit...|2024-06-26 16:23:...|  6171|[FF D8 FF E0 00 1...|
|file:/home/benoit...|2024-06-26 16:19:...|  6131|[FF D8 FF E0 00 1...|
|file:/home/benoit...|2024-06-26 16:23:...|  6066|[FF D8 FF E0 00 1...|
+--------------------+--------------------+------+--------------------+
only showing top 5 rows



<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [8]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+-------------------------------------------------------------------+--------------+
|path                                                               |label         |
+-------------------------------------------------------------------+--------------+
|file:/home/benoit-pr/Projet9/data/Test1/Cucumber Ripe/r_202_100.jpg|Cucumber Ripe |
|file:/home/benoit-pr/Projet9/data/Test1/Cucumber Ripe/r_203_100.jpg|Cucumber Ripe |
|file:/home/benoit-pr/Projet9/data/Test1/Cucumber Ripe/r_201_100.jpg|Cucumber Ripe |
|file:/home/benoit-pr/Projet9/data/Test1/Apple Golden 1/84_100.jpg  |Apple Golden 1|
|file:/home/benoit-pr/Projet9/data/Test1/Cucumber Ripe/r_204_100.jpg|Cucumber Ripe |
+-------------------------------------------------------------------+--------------+
only showing top 5 rows



### 5. Préparation du modèle

In [9]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

In [10]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

In [11]:
broadcast_weights = sc.broadcast(new_model.get_weights())

In [12]:
new_model.summary()

In [13]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(broadcast_weights.value)
    return new_model

### 6. Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [14]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    from PIL import Image
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

In [15]:
def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

In [16]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



### 7. Exécutions des actions d'extractions de features

In [17]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [18]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [19]:
print(PATH_Result)

/home/benoit-pr/Projet9/data/Results


In [20]:
features_df.write.mode("overwrite").parquet(PATH_Result)

2024-07-13 18:19:32.409302: I tensorflow/core/util/port.cc:113] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2024-07-13 18:19:32.410161: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-07-13 18:19:32.415862: I external/local_tsl/tsl/cuda/cudart_stub.cc:32] Could not find cuda drivers on your machine, GPU will not be used.
2024-07-13 18:19:32.435207: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:479] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-07-13 18:19:32.459011: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:10575] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registe

### 8. Chargement des données enregistrées et validation du résultat

In [21]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

display(df.head())

print('----')
print('Shape of df :', df.shape)
print('----')
print('Number of features :', df.loc[0,'features'].shape[0])

Unnamed: 0,path,label,features
0,file:/home/benoit-pr/Projet9/data/Test1/Cucumb...,Cucumber Ripe,"[1.7274102, 0.0, 0.0, 0.0, 0.0079707615, 0.212..."
1,file:/home/benoit-pr/Projet9/data/Test1/Apple ...,Apple Golden 1,"[0.0, 0.0031530322, 0.013699385, 0.0, 0.0, 0.0..."
2,file:/home/benoit-pr/Projet9/data/Test1/Cocos/...,Cocos,"[1.4222728, 0.07663396, 0.0, 0.057266936, 0.0,..."
3,file:/home/benoit-pr/Projet9/data/Test1/Apple ...,Apple Braeburn,"[0.899588, 0.0011514255, 0.0, 0.0, 0.0, 0.7573..."
4,file:/home/benoit-pr/Projet9/data/Test1/Clemen...,Clementine,"[0.4170071, 0.0, 0.0, 0.0, 0.06749238, 0.12453..."


----
Shape of df : (314, 3)
----
Number of features : 1280


### 9. PCA

In [22]:
# function to convert arrays to udf
array_to_vector_udf = udf(lambda arr: Vectors.dense(arr), VectorUDT())

features_df = features_df.withColumn("features_array", array_to_vector_udf(col("features")))

In [23]:
# PCA 
pca = PCA(k=100, inputCol="features_array", outputCol="pca_features")
model = pca.fit(features_df)
pca_result = model.transform(features_df)

[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step0 + 1) / 1]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step0 + 1) / 1]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 991ms/step1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 957ms/step1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 928ms/step1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 950ms/step1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 982ms/step1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 985ms/step1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 967ms/step1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 938ms/step1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 945ms/step1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 929ms/step1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━

In [24]:
# saving results
pca_result.write.mode("overwrite").parquet(PATH_PCA)

24/07/13 18:21:51 WARN DAGScheduler: Broadcasting large task binary with size 1258.7 KiB
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 1s/step + 1) / 20]
[1m1/1[0m [32m━━━

In [25]:
pca = pd.read_parquet(PATH_PCA, engine='pyarrow')

# cleaning columns
values_arrays = pca['pca_features'].apply(lambda x: x['values'])

pca.loc[:, 'pca_features_cleaned'] = values_arrays

pca = pca[['label', 'pca_features_cleaned']]
display(pca.head(5))

Unnamed: 0,label,pca_features_cleaned
0,Cucumber Ripe,"[0.41949933431876923, 5.52525243235353, 0.3503..."
1,Apple Golden 1,"[-1.351246868484353, 1.6813173445094285, 0.165..."
2,Cocos,"[2.4590244112764306, 8.204240946307078, -0.027..."
3,Apple Braeburn,"[11.556017495831197, -6.728766534340602, 0.163..."
4,Clementine,"[6.188678694567658, -5.7299627022322275, 0.152..."
