###  Démarrage de la session Spark

In [None]:
# L'exécution de cette cellule démarre l'application Spark

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

###  Installation des packages

les packages sont installés directement par Databricks 


###  Import des librairies

In [None]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

2024-04-15 12:50:30.824883: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


###  Définition des PATH pour charger les images et enregistrer les résultats

In [None]:
PATH = 's3://projet9'
PATH_Data = PATH+'/Test1'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        s3://projet9
PATH_Data:   s3://projet9/Test1
PATH_Result: s3://projet9/Results


## Liaison avec S3

In [None]:

access_key = 'XXXXXXXXXXXXXXXXXXXXX'
secret_key = 'XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX'
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", access_key)
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", secret_key)

# If you are using Auto Loader file notification mode to load files, provide the AWS Region ID.
aws_region = "eu-west-1"
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3." + aws_region + ".amazonaws.com")

df = spark.read.csv('s3a://projet9/Test1/*',inferSchema=True,header=True)
#Example#

df.show()

+-----------------------------------------------------------------------------------+
|���� JFIF      �� C \a\t\b\a\t\a\b\v\b\t|
+-----------------------------------------------------------------------------------+
|                                                                    �� C|
|                                                               �� \b d d" ...|
|                                                               \v�� � ...|
|                                                               %&'()*456789...|
|                                                               \v�� � \a...|
|                                                               $4�%�&'()*56...|
|                                                                         �F\tP�6+>�|
|                                                               ���v��M_�ƣ�=BmW^�...|
|                                                     

In [None]:
dbutils.fs.mount("s3a://projet9", "/mnt/s3")


True

In [None]:
images_df = spark.read.format("image").load("/mnt/s3/Test1")


###  Traitement des données

####  Chargement des données

In [None]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load("/mnt/s3/Test1")

In [None]:
images.show(5)

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|dbfs:/mnt/s3/Test...|2024-04-11 22:36:32|  5638|[FF D8 FF E0 00 1...|
|dbfs:/mnt/s3/Test...|2024-04-11 22:36:32|  5634|[FF D8 FF E0 00 1...|
|dbfs:/mnt/s3/Test...|2024-04-11 22:36:35|  5633|[FF D8 FF E0 00 1...|
|dbfs:/mnt/s3/Test...|2024-04-11 22:36:37|  5618|[FF D8 FF E0 00 1...|
|dbfs:/mnt/s3/Test...|2024-04-11 22:36:33|  5611|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows



<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [None]:
from pyspark.sql.functions import element_at, split
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+--------------------------------------------------+------------------+
|path                                              |label             |
+--------------------------------------------------+------------------+
|dbfs:/mnt/s3/Test1/Apple Crimson Snow/r_25_100.jpg|Apple Crimson Snow|
|dbfs:/mnt/s3/Test1/Apple Crimson Snow/r_24_100.jpg|Apple Crimson Snow|
|dbfs:/mnt/s3/Test1/Apple Crimson Snow/r_30_100.jpg|Apple Crimson Snow|
|dbfs:/mnt/s3/Test1/Apple Crimson Snow/r_32_100.jpg|Apple Crimson Snow|
|dbfs:/mnt/s3/Test1/Apple Crimson Snow/r_26_100.jpg|Apple Crimson Snow|
+--------------------------------------------------+------------------+
only showing top 5 rows

None


####  Préparation du modèle

In [None]:
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

In [None]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5


In [None]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

## Diffusion des poids sur les workers

In [None]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [None]:
new_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 112, 112, 32)         864       ['input_1[0][0]']             
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 112, 112, 32)         128       ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 112, 112, 32)         0         ['bn_Conv1[0][0]']        

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

#### Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



#### Exécutions des actions d'extractions de features

In [None]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [None]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [None]:
print(PATH_Result)

s3://projet9/Results


In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)

###  Chargement des données enregistrées et validation du résultat

In [None]:
spark.conf.set("fs.s3a.access.key", "XXXXXXXXXXXXXXXXXXXXXXXXXXXX")
spark.conf.set("fs.s3a.secret.key", "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX")

df = spark.read.parquet(PATH_Result, format="parquet", engine='pyarrow')


In [None]:
df.head()

Row(path='dbfs:/mnt/s3/Test1/Apple Braeburn/r_324_100.jpg', label='Apple Braeburn', features=[0.0, 0.0, 0.0, 0.0, 0.0, 0.3567659854888916, 0.0, 0.0, 0.4790845513343811, 0.0, 0.0, 0.11472087353467941, 0.0, 0.21677826344966888, 0.0, 0.12471695989370346, 0.0, 0.0, 0.03761693462729454, 0.18193046748638153, 0.7525757551193237, 0.001459715305827558, 0.0, 0.08489514142274857, 0.0766235888004303, 0.0, 0.0, 0.0, 0.035098347812891006, 0.29426267743110657, 0.3976087272167206, 0.0, 0.0, 0.0, 0.0, 0.4807571768760681, 0.395297110080719, 1.9173842668533325, 0.03558724373579025, 0.0, 0.050677601248025894, 0.0, 0.018720898777246475, 1.138931155204773, 0.0, 0.0, 0.28014299273490906, 0.44888079166412354, 0.03797989338636398, 0.006397286430001259, 0.0, 0.5280892848968506, 0.0, 0.8200268745422363, 0.0, 2.8976383209228516, 0.050280775874853134, 0.0, 0.0, 0.18777263164520264, 0.0, 0.9550777077674866, 0.10281313955783844, 0.0, 1.6376429796218872, 0.10090731829404831, 0.3317829966545105, 0.0, 0.023321745917201

In [None]:
len(df.select('features').first()[0])

1280

In [None]:
num_rows = df.count()
num_cols = len(df.columns)

print(f"Number of rows: {num_rows}")
print(f"Number of columns: {num_cols}")


Number of rows: 312
Number of columns: 3
