In [None]:
# KEEP THIS LINE AT THE TOP
cd ..

# 1. Préambule

## 1.1 Problématique

La très jeune start-up de l'AgriTech, nommée "**Fruits**!" cherche à proposer des solutions innovantes pour la récolte des fruits.

La volonté de l’entreprise est de préserver la biodiversité des fruits en permettant des traitements spécifiques pour chaque espèce de fruits en développant des robots cueilleurs intelligents.

La start-up souhaite dans un premier temps se faire connaître en mettant à disposition du grand public une application mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

Pour la start-up, cette application permettrait de sensibiliser le grand public à la biodiversité des fruits et de mettre en place une première version du moteur de classification des images de fruits.

De plus, le développement de l’application mobile permettra de construire une première version de l'architecture **Big Data** nécessaire.

## 1.2 Objectifs dans ce projet

1. Développer une première chaîne de traitement des données qui comprendra le **preprocessing** et une étape de **réduction de dimension**.
2. Tenir compte du fait que <u>le volume de données va augmenter très rapidement</u> après la livraison de ce projet, ce qui implique de:
 - Déployer le traitement des données dans un environnement **Big Data**
 - Développer les scripts en **pyspark** pour effectuer du **calcul distribué**

## 1.3 Déroulement des étapes du projet

Le projet va être réalisé en 2 temps, dans deux environnements différents. Nous allons dans un premier temps développer et exécuter notre code en local, en travaillant sur un nombre limité d'images à traiter.

Une fois les choix techniques validés, nous déploierons notre solution <dans un environnement Big Data en mode distribué.

<u>Pour cette raison, ce projet sera divisé en 3 parties</u>:
1. Liste des choix techniques généraux retenus
2. Déploiement de la solution en local
3. Déploiement de la solution dans le cloud

# 2. Choix techniques généraux retenus

## 2.1 Calcul distribué

L’énoncé du projet nous impose de développer des scripts en **pyspark** <afin de <u>prendre en compte l’augmentation très rapide du volume de donné après la livraison du projet</u>.

Pour comprendre rapidement et simplement ce qu’est **pyspark** et son principe de fonctionnement, nous vous conseillons de lire cet article : [PySpark : Tout savoir sur la librairie Python](https://datascientest.com/pyspark)

<u>Le début de l’article nous dit ceci </u>:<br />
« *Lorsque l’on parle de traitement de bases de données sur python, on pense immédiatement à la librairie pandas. Cependant, lorsqu’on a affaire à des bases de données trop massives, les calculs deviennent trop lents.Heureusement, il existe une autre librairie python, assez proche de pandas, qui permet de traiter des très grandes quantités de données : PySpark.Apache Spark est un framework open-source développé par l’AMPLab de UC Berkeley permettant de traiter des bases de données massives en utilisant le calcul distribué, technique qui consiste à exploiter plusieurs unités de calcul réparties en clusters au profit d’un seul projet afin de diviser le temps d’exécution d’une requête.Spark a été développé en Scala et est au meilleur de ses capacités dans son langage natif. Cependant, la librairie PySpark propose de l’utiliser avec le langage Python, en gardant des performances similaires à des implémentations en Scala.Pyspark est donc une bonne alternative à la librairie pandas lorsqu’on cherche à traiter des jeux de données trop volumineux qui entraînent <br />
des calculs trop chronophages.* »

Comme nous le constatons, **pySpark** est un moyen de communiquer avec **Spark** via le langage **Python**.**Spark**, quant à lui, est un outil qui permet de gérer et de coordonner l'exécution de tâches sur des données à travers un groupe d'ordinateurs. <u>Spark (ou Apache Spark) est un framework open source de calcul distribué in-memory pour le traitement et l'analyse de données massives</u>.

Un autre [article très intéressant et beaucoup plus complet pour comprendre le **fonctionnement de Spark**](https://www.veonum.com/apache-spark-pour-les-nuls/), ainsi que le rôle des **Spark Session** que nous utiliserons dans ce projet.

<u>Voici également un extrait</u>:

*Les applications Spark se composent d’un pilote (« driver process ») et de plusieurs exécuteurs (« executor processes »). Il peut être configuré pour être lui-même l’exécuteur (local mode) ou en utiliser autant que nécessaire pour traiter l’application, Spark prenant en charge la mise à l’échelle automatique par une configuration d’un nombre minimum et maximum d’exécuteurs.*

![Schéma de Spark](img/spark-schema.png)

*Le driver (parfois appelé « Spark Session ») distribue et planifie les tâches entre les différents exécuteurs qui les exécutent et permettent un traitement réparti. Il est le responsable de l’exécution du code sur les différentes machines.

Chaque exécuteur est un processus Java Virtual Machine (JVM) distinct dont il est possible de configurer le nombre de CPU et la quantité de mémoire qui lui est alloué. Une seule tâche peut traiter un fractionnement de données à la fois.*

Dans les deux environnements (Local et Cloud) nous utiliserons donc **Spark** et nous l’exploiterons à travers des scripts python grâce à **PySpark**.

Dans la <u>version locale</u> de notre script nous **simulerons le calcul distribué** afin de valider que notre solution fonctionne.Dans la <u>version cloud</u> nous **réaliserons les opérations sur un cluster de machine**.

## 2.2 Transfert Learning

L'énoncé du projet nous demande également de réaliser une première chaîne de traitement des données qui comprendra le preprocessing et <br />
une étape de réduction de dimension.

Il est également précisé qu'il n'est pas nécessaire d'entraîner un modèle pour le moment.

Nous décidons de partir sur une solution de **transfert learning**.

Simplement, le **transfert learning** consiste à utiliser la connaissance déjà acquise par un modèle entraîné (ici **MobileNetV2**) pour l'adapter à notre problématique.

Nous allons fournir au modèle nos images, et nous allons <u>récupérer l'avant dernière couche</u> du modèle. En effet la dernière couche de modèle est une couche softmax qui permet la classification des images ce que nous ne souhaitons pas dans ce projet.

L'avant dernière couche correspond à un **vecteur réduit** de dimension (1,1,1280).

Cela permettra de réaliser une première version du moteur pour la classification des images des fruits.

**MobileNetV2** a été retenu pour sa <u>rapidité d'exécution</u>, particulièrement adaptée pour le traitement d'un gros volume de données ainsi que la <u>faible dimensionnalité du vecteur de caractéristique en sortie</u> (1,1,1280)

# 3. Déploiement de la solution en local


## 3.1 Installation de Spark

[La première étape consiste à installer Spark ](https://computingforgeeks.com/how-to-install-apache-spark-on-ubuntu-debian/)

https://spark.apache.org/downloads.html

https://cedric.cnam.fr/vertigo/Cours/RCP216/installationSpark.html


## 3.2 Installation des packages

<u>On installe ensuite à l'aide de la commande **pip** les packages qui nous seront nécessaires</u> :

In [1]:
pip install numpy

Note: you may need to restart the kernel to use updated packages.


In [None]:
pip install daal==2021.4.0

In [2]:
pip install Pandas pillow pyspark pyarrow

Note: you may need to restart the kernel to use updated packages.


In [3]:
pip install tensorflow

Note: you may need to restart the kernel to use updated packages.


In [4]:
pip install findspark

Note: you may need to restart the kernel to use updated packages.


## 3.4 Import des librairies

In [2]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os


from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

from pyspark.sql.types import *
from pyspark.sql.functions import udf
from pyspark.ml.linalg import Vectors, VectorUDT
import time
from pyspark.ml.feature import PCA

In [None]:

from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

In [1]:
import tensorflow as tf

: 

## 3.5 Définition des PATH pour charger les images <br /> et enregistrer les résultats

Dans cette version locale nous partons du principe que les données sont stockées dans le même répertoire que le notebook.Nous n'utilisons qu'un extrait de **300 images** à traiter dans cette première version en local.

L'extrait des images à charger est stockée dans le dossier **Test1**.Nous enregistrerons le résultat de notre traitement dans le dossier "**Results_Local**"

In [3]:
PATH = os.getcwd()
PATH_Data = PATH+'/data/Test1'
PATH_Result = PATH+'/data/Results_Local'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        /home/helene/PROJET_8/P8_env
PATH_Data:   /home/helene/PROJET_8/P8_env/data/Test1
PATH_Result: /home/helene/PROJET_8/P8_env/data/Results_Local


## 3.6 Création de la SparkSession

L’application Spark est contrôlée grâce à un processus de pilotage (driver process) appelé **SparkSession**. <u>Une instance de **SparkSession** est la façon dont Spark exécute les fonctions définies par l’utilisateur dans l’ensemble du cluster</u>. <u>Une SparkSession correspond toujours à une application Spark</u>.

<u>Ici nous créons une session spark en spécifiant dans l'ordre</u> :
 1. un **nom pour l'application**, qui sera affichée dans l'interface utilisateur Web Spark "**P8**"
 2. que l'application doit s'exécuter **localement**. Nous ne définissons pas le nombre de cœurs à utiliser (comme .master('local[4]) pour 4 cœurs à utiliser), nous utiliserons donc tous les cœurs disponibles dans notre processeur.
 3. une option de configuration supplémentaire permettant d'utiliser le **format "parquet"** que nous utiliserons pour enregistrer et charger le résultat de notre travail.
 4. vouloir **obtenir une session spark** existante ou si aucune n'existe, en créer une nouvelle

In [4]:
import findspark
findspark.init()

In [5]:
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)

FileNotFoundError: [Errno 2] No such file or directory: '/lib/Python3.10/Site-packages/Tensorflow/./bin/spark-submit'

<u>Nous créons également la variable "**sc**" qui est un **SparkContext** issue de la variable **spark**</u> :

In [None]:
sc = spark.sparkContext

<u>Affichage des informations de Spark en cours d'execution</u> :

In [None]:
spark

## 3.7 Traitement des données

<u>Dans la suite de notre flux de travail, nous allons successivement</u> :
1. Préparer nos données
    1. Importer les images dans un dataframe **pandas UDF**
    2. Associer aux images leur **label**
    3. Préprocesser en **redimensionnant nos images pour qu'elles soient compatibles avec notre modèle**
2. Préparer notre modèle
    1. Importer le modèle **MobileNetV2**
    2. Créer un **nouveau modèle** dépourvu de la dernière couche de MobileNetV2
3. Définir le processus de chargement des images et l'application de leur featurisation à travers l'utilisation de pandas UDF
3. Exécuter les actions d'extraction de features
4. Enregistrer le résultat de nos actions
5. Tester le bon fonctionnement en chargeant les données enregistrées


### 3.7.1 Chargement des données

Les images sont chargées au format binaire, ce qui offre, plus de souplesse dans la façon de prétraiter les images.
Avant de charger les images, nous spécifions que nous voulons charger uniquement les fichiers dont l'extension est **jpg**.
Nous indiquons également de charger tous les objets possibles contenus dans les sous-dossiers du dossier communiqué.

In [None]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

<u>Affichage des 5 premières images contenant</u> :
 - le path de l'image
 - la date et heure de sa dernière modification
 - sa longueur
 - son contenu encodé en valeur hexadécimal

<u>Je ne conserve que le **path** de l'image et j'ajoute une colonne contenant les **labels** de chaque image</u> :

In [None]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

### 3.7.2 Préparation du modèle

Je vais utiliser la technique du **transfert learning** pour extraire les features des images. J'ai choisi d'utiliser le modèle **MobileNetV2** pour sa rapidité d'exécution comparée à d'autres modèles comme *VGG16* par exemple.

Pour en savoir plus sur la conception et le fonctionnement de MobileNetV2, je vous invite à lire [cet article](https://towardsdatascience.com/review-mobilenetv2-light-weight-model-image-classification-8febb490e61c).

<u>Voici le schéma de son architecture globale</u> : 

![Architecture de MobileNetV2](img/mobilenetv2_architecture.png)

Il existe une dernière couche qui sert à classer les images selon 1000 catégories que nous ne voulons pas utiliser.L'idée dans ce projet est de récupérer le **vecteur de caractéristiques de dimensions (1,1,1280)** qui servira, plus tard, au travers d'un moteur de classification à reconnaitre les différents fruits du jeu de données.

Comme d'autres modèles similaires, **MobileNetV2**, lorsqu'on l'utilise en incluant toutes ses couches, attend obligatoirement des images de dimension (224,224,3). Nos images étant toutes de dimension (100,100,3), nous devrons simplement les **redimensionner** avant de les confier au modèle.

<u>Dans l'odre</u> :
 1. Nous chargeons le modèle **MobileNetV2** avec les poids **précalculés** issus d'**imagenet** et en spécifiant le format de nos images en entrée
 2. Nous créons un nouveau modèle avec:
  - <u>en entrée</u> : l'entrée du modèle MobileNetV2
  - <u>en sortie</u> : l'avant dernière couche du modèle MobileNetV2

In [None]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

In [None]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

Affichage du résumé de notre nouveau modèle où nous constatons que <u>nous récupérons bien en sortie un vecteur de dimension (1, 1, 1280)</u> :

In [None]:
new_model.summary()

Tous les workeurs doivent pouvoir accéder au modèle ainsi qu'à ses poids. Une bonne pratique consiste à charger le modèle sur le driver puis à diffuser ensuite les poids aux différents workeurs.

In [None]:
brodcast_weights = sc.broadcast(new_model.get_weights())

<u>Mettons cela sous forme de fonction</u> :

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

### 3.7.3 Définition du processus de chargement des images et application <br/>de leur featurisation à travers l'utilisation de pandas UDF

Ce notebook définit la logique par étapes, jusqu'à Pandas UDF.

<u>L'empilement des appels est la suivante</u> :

- Pandas UDF
  - featuriser une série d'images pd.Series
   - prétraiter une image

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

### 3.7.4 Exécution des actions d'extraction de features

Les Pandas UDF, sur de grands enregistrements (par exemple, de très grandes images), peuvent rencontrer des erreurs de type Out Of Memory (OOM). Si vous rencontrez de telles erreurs dans la cellule ci-dessous, essayez de réduire la taille du lot Arrow via 'maxRecordsPerBatch'

Je n'utiliserai pas cette commande dans ce projet et je laisse donc la commande en commentaire.

Nous pouvons maintenant exécuter la featurisation sur l'ensemble de notre DataFrame Spark.
<u>REMARQUE</u> : Cela peut prendre beaucoup de temps, tout dépend du volume de données à traiter. <br />

Notre jeu de données de **Test** contient **22819 images**. Cependant, dans l'exécution en mode **local**, nous <u>traiterons un ensemble réduit de **3000 images**</u>.

In [None]:
features_df = images.repartition(20).select(col("path"), 
                                            ("label"),featurize_udf("content").alias("features"))

In [None]:
features_df.show(5, True)

<u>Rappel du PATH où seront inscrits les fichiers au format "**parquet**" <contenant nos résultats, à savoir, un DataFrame contenant 3 colonnes</u> :
 1. Path des images
 2. Label de l'image
 3. Vecteur de caractéristiques de l'image

In [None]:
print(PATH_Result)

<u>Enregistrement des données traitées au format "**parquet**"</u> :

In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)

## 3.8 Réduction de dimension

#### Transformation de Tableau à vecteurs pour mise en application réduction

In [None]:
array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

In [None]:
vectorise_df = features_df.withColumn('cnn_vectors', array_to_vector_udf('features'))

In [None]:
vectorise_df.show(5, True)

####  Initalisation et application PCA

In [None]:
# reduce with PCA - set k Max to determine the adequate nb of principal components
start = time.perf_counter()
pca = PCA(k=20, inputCol='cnn_vectors', outputCol='pca_vectors')
model = pca.fit(vectorise_df)
stop = time.perf_counter()
print(f'pca - fit best k nb, elapsed time: {stop - start:0.2f}s')

In [None]:
# application réduction PCA
start = time.perf_counter()
reduced_df = model.transform(vectorise_df)
stop = time.perf_counter()
print(f'pca - application, elapsed time: {stop - start:0.2f}s')

In [None]:
reduced_df.show(5, True)

####  Transformation inverse : de Vecteur à Tableau - Pandas

In [None]:
vector_to_array_udf = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))

In [None]:
final_df = reduced_df.withColumn('pca_features', vector_to_array_udf('pca_vectors'))

In [None]:
final_df.show(5, True)

## 3.9 Chargement des données enregistrées et validation du résultat

<u>On charge les données fraichement enregistrées dans un **DataFrame Pandas**</u> :

In [None]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

In [None]:
df.head()

<u>On valide que la dimension du vecteur de caractéristiques des images est bien de dimension 1280</u> :

In [None]:
df.loc[0,'features'].shape

Nous venons de valider le processus sur un jeu de données allégé en local où nous avons simulé un cluster de machines en répartissant la charge de travail sur différents cœurs de processeur au sein d'une même machine.

Nous allons maintenant généraliser le processus en déployant notre solution sur un réel cluster de machines et nous travaillerons désormais sur la totalité des 22819 images de notre dossier "Test".