# 1. Démarrage SPARK



In [1]:
# demarrage spark

## 2. Import des librairies

In [None]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

In [None]:
from pyspark.ml.feature import PCA
from pyspark.ml.functions import array_to_vector
from pyspark.ml.functions import vector_to_array

## 3. Définition des PATH pour charger les images <br /> et enregistrer les résultats

Accès ver le dossier S3

In [None]:
PATH = 's3://fruitsaf33'
PATH_Data = PATH+'/data'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        /content/drive/MyDrive/Colab Notebooks/data
PATH_Data:   /content/drive/MyDrive/Colab Notebooks/data
PATH_Result: /content/drive/MyDrive/Colab Notebooks/results


# 4. Traitement des données

<u>Dans la suite de notre flux de travail, <br />
nous allons successivement</u> :
1. Préparer nos données
    1. Importer les images dans un dataframe **pandas UDF**
    2. Associer aux images leur **label**
    3. Préprocesser en **redimensionnant nos images pour <br />
       qu'elles soient compatibles avec notre modèle**
2. Préparer notre modèle
    1. Importer le modèle **MobileNetV2**
    2. Créer un **nouveau modèle** dépourvu de la dernière couche de MobileNetV2
3. Définir le processus de chargement des images et l'application <br />
   de leur featurisation à travers l'utilisation de pandas UDF
3. Exécuter les actions d'extraction de features
4. Enregistrer le résultat de nos actions
5. Tester le bon fonctionnement en chargeant les données enregistrées




### 4.1 Chargement des données

Les images sont chargées au format binaire, ce qui offre, <br />
plus de souplesse dans la façon de prétraiter les images.

Avant de charger les images, nous spécifions que nous voulons charger <br />
uniquement les fichiers dont l'extension est **jpg**.

Nous indiquons également de charger tous les objets possibles contenus <br />
dans les sous-dossiers du dossier communiqué.

In [None]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

<u>Affichage des 5 premières images contenant</u> :
 - le path de l'image
 - la date et heure de sa dernière modification
 - sa longueur
 - son contenu encodé en valeur hexadécimal

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [None]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+------------------------------------------------------------------+-----+
|path                                                              |label|
+------------------------------------------------------------------+-----+
|file:/content/drive/MyDrive/Colab Notebooks/data/Cocos/118_100.jpg|Cocos|
|file:/content/drive/MyDrive/Colab Notebooks/data/Cocos/168_100.jpg|Cocos|
|file:/content/drive/MyDrive/Colab Notebooks/data/Cocos/143_100.jpg|Cocos|
|file:/content/drive/MyDrive/Colab Notebooks/data/Cocos/93_100.jpg |Cocos|
|file:/content/drive/MyDrive/Colab Notebooks/data/Cocos/218_100.jpg|Cocos|
+------------------------------------------------------------------+-----+
only showing top 5 rows

None


### 4.2 Préparation du modèle

Je vais utiliser la technique du **transfert learning** pour extraire les features des images.<br />
J'ai choisi d'utiliser le modèle **MobileNetV2** pour sa rapidité d'exécution comparée <br />
à d'autres modèles comme *VGG16* par exemple.

In [None]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5


In [None]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

Affichage du résumé de notre nouveau modèle où nous constatons <br />
que <u>nous récupérons bien en sortie un vecteur de dimension (1, 1, 1280)</u> :

In [None]:
new_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 112, 112, 32)         864       ['input_1[0][0]']             
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 112, 112, 32)         128       ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 112, 112, 32)         0         ['bn_Conv1[0][0]']        

Tous les workeurs doivent pouvoir accéder au modèle ainsi qu'à ses poids. <br />
Une bonne pratique consiste à charger le modèle sur le driver puis à diffuser <br />
ensuite les poids aux différents workeurs.

In [None]:
brodcast_weights = sc.broadcast(new_model.get_weights())

<u>Mettons cela sous forme de fonction</u> :

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

### 4.3 Définition du processus de chargement des images et application <br/>de leur featurisation à travers l'utilisation de pandas UDF

Ce notebook définit la logique par étapes, jusqu'à Pandas UDF.

<u>L'empilement des appels est la suivante</u> :

- Pandas UDF
  - featuriser une série d'images pd.Series
   - prétraiter une image

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.

    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



### 4.4 Exécution des actions d'extraction de features

Les Pandas UDF, sur de grands enregistrements (par exemple, de très grandes images), <br />
peuvent rencontrer des erreurs de type Out Of Memory (OOM).<br />
Si vous rencontrez de telles erreurs dans la cellule ci-dessous, <br />
essayez de réduire la taille du lot Arrow via 'maxRecordsPerBatch'

Je n'utiliserai pas cette commande dans ce projet <br />
et je laisse donc la commande en commentaire.

In [None]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

Nous pouvons maintenant exécuter la featurisation sur l'ensemble de notre DataFrame Spark.<br />
<u>REMARQUE</u> : Cela peut prendre beaucoup de temps, tout dépend du volume de données à traiter. <br />


In [None]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features"),
                                            featurize_udf("content").alias("features_vectors")).withColumn("features_vectors", array_to_vector("features_vectors"))

In [None]:
features_df.show(5, True)

+--------------------+------------+--------------------+--------------------+
|                path|       label|            features|    features_vectors|
+--------------------+------------+--------------------+--------------------+
|file:/content/dri...|   Carambula|[0.0017966543, 0....|[0.00179665430914...|
|file:/content/dri...|   Carambula|[0.063041806, 0.0...|[0.06304180622100...|
|file:/content/dri...|     Avocado|[0.45384344, 0.0,...|[0.45384344458580...|
|file:/content/dri...|Cactus fruit|[0.5492148, 0.088...|[0.54921478033065...|
|file:/content/dri...|  Grape Blue|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|
+--------------------+------------+--------------------+--------------------+
only showing top 5 rows



## 4.5 Application de la réduction de dimension

In [None]:
pca = PCA(k=100, inputCol = 'features_vectors', outputCol = 'pca_vectors')
model = pca.fit(features_df)
df_pca = model.transform(features_df)
df_pca.show(5, True)

+--------------------+------------+--------------------+--------------------+--------------------+
|                path|       label|            features|    features_vectors|         pca_vectors|
+--------------------+------------+--------------------+--------------------+--------------------+
|file:/content/dri...|   Carambula|[0.0017966543, 0....|[0.00179665430914...|[7.02984969477823...|
|file:/content/dri...|   Carambula|[0.063041806, 0.0...|[0.06304180622100...|[8.86773916807895...|
|file:/content/dri...|     Avocado|[0.45384344, 0.0,...|[0.45384344458580...|[-8.1079238361747...|
|file:/content/dri...|Cactus fruit|[0.5492148, 0.088...|[0.54921478033065...|[4.81757612642109...|
|file:/content/dri...|  Grape Blue|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[-15.470275950760...|
+--------------------+------------+--------------------+--------------------+--------------------+
only showing top 5 rows



In [None]:
# ajout d'une colonne transformé vecteur ==> array
df_pca = df_pca.withColumn('pca_features', vector_to_array('pca_vectors'))
df_pca.show(5, True)

+--------------------+------------+--------------------+--------------------+--------------------+--------------------+
|                path|       label|            features|    features_vectors|         pca_vectors|        pca_features|
+--------------------+------------+--------------------+--------------------+--------------------+--------------------+
|file:/content/dri...|   Carambula|[0.0017966543, 0....|[0.00179665430914...|[7.02984969477823...|[7.02984969477823...|
|file:/content/dri...|   Carambula|[0.063041806, 0.0...|[0.06304180622100...|[8.86773916807895...|[8.86773916807895...|
|file:/content/dri...|     Avocado|[0.45384344, 0.0,...|[0.45384344458580...|[-8.1079238361747...|[-8.1079238361747...|
|file:/content/dri...|Cactus fruit|[0.5492148, 0.088...|[0.54921478033065...|[4.81757612642109...|[4.81757612642109...|
|file:/content/dri...|  Grape Blue|[0.0, 0.0, 0.0, 0...|[0.0,0.0,0.0,0.0,...|[-15.470275950760...|[-15.470275950760...|
+--------------------+------------+-----

<u>Rappel du PATH où seront inscrits les fichiers au format "**parquet**" <br />
contenant nos résultats, à savoir, un DataFrame contenant 3 colonnes</u> :
 1. Path des images
 2. Label de l'image
 3. Vecteur de caractéristiques de l'image
 4. Vecteur de caractéristiques de l'image au format vecteur
 5. Réduction de dimension l'image au format vecteur
 6. Réduction de dimension l'image

In [None]:
print(PATH_Result)

/content/drive/MyDrive/Colab Notebooks/results


<u>Enregistrement des données traitées au format "**parquet**"</u> :

In [None]:
df_pca.write.mode("overwrite").parquet(PATH_Result)