#  <span style='color:DarkBlue'>P8 - Deployer un modèle dans le Cloud</span>

<div style="text-align:center">
    <img src="images/logo_P8.png" width="50%">
</div>

# P8_01_02 - CLOUD - Utilisation d'AWS

Ce notebook traite de du chargement du jeu de données des images, du pré-processing, de la réduction de dimension et d'une mini classification pour des nouvelles images en utilisant l'outils **Amazon Web Services** pour la partie "**compute**" (calculs distribués) et un **container blob de stockage** "**Data Lake Storage**".

# 1. Sommaire :

**1. Préambule**<br />
&emsp;1.1 Problématique<br />
&emsp;1.2 Objectifs dans ce projet<br />
&emsp;1.3 Déroulement des étapes du projet<br />
**2. Choix techniques généraux retenus**<br />
&emsp;2.1 Calcul distribué<br />
&emsp;2.2 Transfert Learning<br />
**3. Déploiement de la solution en local**<br />
&emsp;3.1 Environnement de travail<br />
&emsp;3.2 Installation de Spark<br />
&emsp;3.3 Installation des packages<br />
&emsp;3.4 Import des librairies<br />
&emsp;3.5 Définition des PATH pour charger les images et enregistrer les résultats<br />
&emsp;3.6 Création de la SparkSession<br />
&emsp;3.7 Traitement des données<br />
&emsp;&emsp;3.7.1 Chargement des données<br />
&emsp;&emsp;3.7.2 Préparation du modèle<br />
&emsp;&emsp;3.7.3 Définition du processus de chargement des images et application <br />
&emsp;&emsp;&emsp;&emsp;&emsp;de leur featurisation à travers l'utilisation de pandas UDF<br />
&emsp;&emsp;3.7.4 Exécution des actions d'extractions de features<br />
&emsp;3.8 Chargement des données enregistrées et validation du résultat<br />
**4. Déploiement de la solution sur le cloud**<br />
&emsp;4.1 Choix du prestataire cloud : AWS<br />
&emsp;4.2 Choix de la solution technique : EMR<br />
&emsp;4.3 Choix de la solution de stockage des données : Amazon S3<br />
&emsp;4.4 Configuration de l'environnement de travail<br />
&emsp;4.5 Upload de nos données sur S3<br />
&emsp;4.6 Configuration du serveur EMR<br />
&emsp;&emsp;4.6.1 Étape 1 : Logiciels et étapes<br />
&emsp;&emsp;&emsp;4.6.1.1 Configuration des logiciels<br />
&emsp;&emsp;&emsp;4.6.1.2 Modifier les paramètres du logiciel<br />
&emsp;&emsp;4.6.2 Étape 2 : Matériel<br />
&emsp;&emsp;4.6.3 Étape 3 : Paramètres de cluster généraux<br />
&emsp;&emsp;&emsp;4.6.3.1 Options générales<br />
&emsp;&emsp;&emsp;4.6.3.2 Actions d'amorçage<br />
&emsp;&emsp;4.6.4 Étape 4 : Sécurité<br />
&emsp;&emsp;&emsp;4.6.4.1 Options de sécurité<br />
&emsp;4.7 Instanciation du serveur<br />
&emsp;4.8 Création du tunnel SSH à l'instance EC2 (Maître)<br />
&emsp;&emsp;4.8.1 Création des autorisations sur les connexions entrantes<br />
&emsp;&emsp;4.8.2 Création du tunnel ssh vers le Driver<br />
&emsp;&emsp;4.8.3 Configuration de FoxyProxy<br />
&emsp;&emsp;4.8.4 Accès aux applications du serveur EMR via le tunnel ssh<br />
&emsp;4.9 Connexion au notebook JupyterHub<br />
&emsp;4.10 Exécution du code<br />
&emsp;&emsp;4.10.1 Démarrage de la session Spark<br />
&emsp;&emsp;4.10.2 Installation des packages<br />
&emsp;&emsp;4.10.3 Import des librairies<br />
&emsp;&emsp;4.10.4 Définition des PATH pour charger les images et enregistrer les résultats<br />
&emsp;&emsp;4.10.5 Traitement des données<br />
&emsp;&emsp;&emsp;4.10.5.1 Chargement des données<br />
&emsp;&emsp;&emsp;4.10.5.2 Préparation du modèle<br />
&emsp;&emsp;&emsp;4.10.5.3 Définition du processus de chargement des images<br />
&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;&emsp;et application de leur featurisation à travers l'utilisation de pandas UDF<br />
&emsp;&emsp;&emsp;4.10.5.4 Exécutions des actions d'extractions de features<br />
&emsp;&emsp;4.10.6 Chargement des données enregistrées et validation du résultat<br />
&emsp;4.11 Suivi de l'avancement des tâches avec le Serveur d'Historique Spark<br />
&emsp;4.12 Résiliation de l'instance EMR<br />
&emsp;4.13 Cloner le serveur EMR (si besoin)<br />
&emsp;4.14 Arborescence du serveur S3 à la fin du projet<br />
**5. Conclusion**

# 2. Préambule

## 2.1 Problématique

La très jeune start-up de l'AgriTech, nommée "**Fruits**!", <br />
cherche à proposer des solutions innovantes pour la récolte des fruits.

La volonté de l’entreprise est de préserver la biodiversité des fruits <br />
en permettant des traitements spécifiques pour chaque espèce de fruits <br />
en développant des robots cueilleurs intelligents.

La start-up souhaite dans un premier temps se faire connaître en mettant <br />
à disposition du grand public une application mobile qui permettrait aux <br />
utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

Pour la start-up, cette application permettrait de sensibiliser le grand public <br /> 
à la biodiversité des fruits et de mettre en place une première version du moteur <br />
de classification des images de fruits.

De plus, le développement de l’application mobile permettra de construire <br />
une première version de l'architecture **Big Data** nécessaire.

## 2.2 Objectifs dans ce projet

1. Développer une première chaîne de traitement des données qui <br />
   comprendra le **preprocessing** et une étape de **réduction de dimension**.
2. Tenir compte du fait que <u>le volume de données va augmenter <br />
   très rapidement</u> après la livraison de ce projet, ce qui implique de:
 - Déployer le traitement des données dans un environnement **Big Data**
 - Développer les scripts en **pyspark** pour effectuer du **calcul distribué**

## 2.3 Déroulement des étapes du projet

Le projet va être réalisé en 2 temps, dans deux environnements différents. <br />
Nous allons dans un premier temps développer et exécuter notre code en local, <br />
en travaillant sur un nombre limité d'images à traiter.

Une fois les choix techniques validés, nous déploierons notre solution <br />
dans un environnement Big Data en mode distribué.

<u>Pour cette raison, ce projet sera divisé en 3 parties</u>:
1. Liste des choix techniques généraux retenus
2. Déploiement de la solution en local
3. Déploiement de la solution dans le cloud

# 3. Choix techniques généraux retenus

## 3.1 Calcul distribué

L’énoncé du projet nous impose de développer des scripts en **pyspark** <br />
afin de <u>prendre en compte l’augmentation très rapide du volume <br />
de donné après la livraison du projet</u>.

Pour comprendre rapidement et simplement ce qu’est **pyspark** <br />
et son principe de fonctionnement, nous vous conseillons de lire <br />
cet article : [PySpark : Tout savoir sur la librairie Python](https://datascientest.com/pyspark)

<u>Le début de l’article nous dit ceci </u>:<br />
« *Lorsque l’on parle de traitement de bases de données sur python, <br />
on pense immédiatement à la librairie pandas. Cependant, lorsqu’on a <br />
affaire à des bases de données trop massives, les calculs deviennent trop lents.<br />
Heureusement, il existe une autre librairie python, assez proche <br />
de pandas, qui permet de traiter des très grandes quantités de données : PySpark.<br />
Apache Spark est un framework open-source développé par l’AMPLab <br />
de UC Berkeley permettant de traiter des bases de données massives <br />
en utilisant le calcul distribué, technique qui consiste à exploiter <br />
plusieurs unités de calcul réparties en clusters au profit d’un seul <br />
projet afin de diviser le temps d’exécution d’une requête.<br />
Spark a été développé en Scala et est au meilleur de ses capacités <br />
dans son langage natif. Cependant, la librairie PySpark propose de <br />
l’utiliser avec le langage Python, en gardant des performances <br />
similaires à des implémentations en Scala.<br />
Pyspark est donc une bonne alternative à la librairie pandas lorsqu’on <br />
cherche à traiter des jeux de données trop volumineux qui entraînent <br />
des calculs trop chronophages.* »

Comme nous le constatons, **pySpark** est un moyen de communiquer <br />
avec **Spark** via le langage **Python**.<br />
**Spark**, quant à lui, est un outil qui permet de gérer et de coordonner <br />
l'exécution de tâches sur des données à travers un groupe d'ordinateurs. <br />
<u>Spark (ou Apache Spark) est un framework open source de calcul distribué <br />
in-memory pour le traitement et l'analyse de données massives</u>.

Un autre [article très intéressant et beaucoup plus complet pour <br />
comprendre le **fonctionnement de Spark**](https://www.veonum.com/apache-spark-pour-les-nuls/), ainsi que le rôle <br />
des **Spark Session** que nous utiliserons dans ce projet.

<u>Voici également un extrait</u>:

*Les applications Spark se composent d’un pilote (« driver process ») <br />
et de plusieurs exécuteurs (« executor processes »). Il peut être configuré <br />
pour être lui-même l’exécuteur (local mode) ou en utiliser autant que <br />
nécessaire pour traiter l’application, Spark prenant en charge la mise <br />
à l’échelle automatique par une configuration d’un nombre minimum <br />
et maximum d’exécuteurs.*

![Schéma de Spark](img/spark-schema.png)

*Le driver (parfois appelé « Spark Session ») distribue et planifie <br />
les tâches entre les différents exécuteurs qui les exécutent et permettent <br />
un traitement réparti. Il est le responsable de l’exécution du code <br />
sur les différentes machines.

Chaque exécuteur est un processus Java Virtual Machine (JVM) distinct <br />
dont il est possible de configurer le nombre de CPU et la quantité de <br />
mémoire qui lui est alloué. <br />
Une seule tâche peut traiter un fractionnement de données à la fois.*

Dans les deux environnements (Local et Cloud) nous utiliserons donc **Spark** <br />
et nous l’exploiterons à travers des scripts python grâce à **PySpark**.

Dans la <u>version locale</u> de notre script nous **simulerons <br />
le calcul distribué** afin de valider que notre solution fonctionne.<br />
Dans la <u>version cloud</u> nous **réaliserons les opérations sur un cluster de machine**.

## 4.1 Transfert Learning

L'énoncé du projet nous demande également de <br />
réaliser une première chaîne de traitement <br />
des données qui comprendra le preprocessing et <br />
une étape de réduction de dimension.

Il est également précisé qu'il n'est pas nécessaire <br />
d'entraîner un modèle pour le moment.

Nous décidons de partir sur une solution de **transfert learning**.

Simplement, le **transfert learning** consiste <br />
à utiliser la connaissance déjà acquise <br />
par un modèle entraîné (ici **MobileNetV2**) pour <br />
l'adapter à notre problématique.

Nous allons fournir au modèle nos images, et nous allons <br />
<u>récupérer l'avant dernière couche</u> du modèle.<br />
En effet la dernière couche de modèle est une couche softmax <br />
qui permet la classification des images ce que nous ne <br />
souhaitons pas dans ce projet.

L'avant dernière couche correspond à un **vecteur <br />
réduit** de dimension (1,1,1280).

Cela permettra de réaliser une première version du moteur <br />
pour la classification des images des fruits.

**MobileNetV2** a été retenu pour sa <u>rapidité d'exécution</u>, <br />
particulièrement adaptée pour le traitement d'un gros volume <br />
de données ainsi que la <u>faible dimensionnalité du vecteur <br />
de caractéristique en sortie</u> (1,1,1280)

# 5. Déploiement de la solution en local


## 5.1 Environnement de travail

Pour des raisons de simplicité, nous développons dans un environnement <br />
Linux Unbuntu (exécuté depuis une machine Windows dans une machine virtuelle)
* Pour installer une machine virtuelle :  https://www.malekal.com/meilleurs-logiciels-de-machine-virtuelle-gratuits-ou-payants/

## 5.2 Installation de Spark

[La première étape consiste à installer Spark ](https://computingforgeeks.com/how-to-install-apache-spark-on-ubuntu-debian/)

## 5.3 Installation des packages

<u>On installe ensuite à l'aide de la commande **pip** <br />
les packages qui nous seront nécessaires</u> :

## 6. Introduction

*****
**Mission**
*****
**Développer dans un environnement Big Data une première chaîne de traitement des données qui comprendra le preprocessing et une étape de réduction de dimension** pour une startup Fruits! de l'AgriTech pour mettre à disposition du grand public une application mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

*****
**Contraintes**
*****
- Le volume de données va augmenter très rapidement après la livraison de ce projet.
- Développer des scripts en Pyspark.
- Utiliser le cloud AWS ou autre (Microsoft Azure sera utilisé pour ce projet) pour profiter d’une architecture Big Data. 

*****
**Sources**
*****
- [Jeu de données](https://www.kaggle.com/moltean/fruits) : constitué des images de fruits et des labels associés, qui pourra servir de point de départ pour construire une partie de la chaîne de traitement des données.

**Information sur le jeu de données :**
***

- Nombre **total d'images** : **90483**.
- Jeu de données **train set** : **67692** images (1 fruit ou 1 légumes par image).
- Jeu de données **test set** : **22688**  images (1 fruit ou 1 légumes par image).
- Nombre de **classes** : 131 (fruits ou légumes).
- **Taille** des images : 100x100 pixels.
- **Format du nom de fichier** : 
  - **imageindex100.jpg** (par exemple 32100.jpg),
  - ou **rimageindex100.jpg** (par exemple r32100.jpg),
  - ou **r2imageindex100.jpg**,
  - ou **r3imageindex100.jpg**. 
     - ou "r" signifie que le fruit a subi une rotation,
     - "r2" signifie que le fruit a été tourné autour du 3ème axe,
     - "100" vient de la taille de l'image (100x100 pixels).
- Exemples de classe : Apples (different varieties: Crimson Snow, Golden, Golden-Red, Granny Smith, Pink Lady, Red, Red Delicious), Apricot, Avocado, Avocado ripe, Banana (Yellow, Red, Lady Finger), Beetroot Red, Blueberry, Cactus fruit, Cantaloupe (2 varieties), Carambula, Cauliflower, Cherry (different varieties, Rainier), Cherry Wax (Yellow, Red, Black), Chestnut, Clementine, Cocos, Corn (with husk), Cucumber (ripened), Dates, Eggplant, Fig, Ginger Root, Granadilla, Grape (Blue, Pink, White (different varieties)), Grapefruit (Pink, White), Guava, Hazelnut, Huckleberry, Kiwi, Kaki, Kohlrabi, Kumsquats, Lemon (normal, Meyer), Lime, Lychee, Mandarine, Mango (Green, Red), Mangostan, Maracuja, Melon Piel de Sapo, Mulberry, Nectarine (Regular, Flat), Nut (Forest, Pecan), Onion (Red, White), Orange, Papaya, Passion fruit, Peach (different varieties), Pepino, Pear (different varieties, Abate, Forelle, Kaiser, Monster, Red, Stone, Williams), Pepper (Red, Green, Orange, Yellow), Physalis (normal, with Husk), Pineapple (normal, Mini), Pitahaya Red, Plum (different varieties), Pomegranate, Pomelo Sweetie, Potato (Red, Sweet, White), Quince, Rambutan, Raspberry, Redcurrant, Salak, Strawberry (normal, Wedge), Tamarillo, Tangelo, Tomato (different varieties, Maroon, Cherry Red, Yellow, not ripened, Heart), Walnut, Watermelon.

### Démarrage de la session Spark

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1725884941093_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Affichage des informations de la Session Spark en cours

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1725884941093_0001,pyspark,idle,Link,Link,,✔


In [3]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, element_at
from pyspark.ml.feature import PCA, StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 6.1 Définition des PATH pour charger les images <br /> et enregistrer les résultats

Dans cette version locale nous partons du principe que les données <br />
sont stockées dans le même répertoire que le notebook.<br />
Nous n'utilisons qu'un extrait de **300 images** à traiter dans cette <br />
première version en local.<br />
L'extrait des images à charger est stockée dans le dossier **Test**.<br />
Nous enregistrerons le résultat de notre traitement <br />
dans le dossier "**Validation**"

In [4]:
PATH = 's3://p08-projet-data'
PATH_Data =  PATH+"/Test"
PATH_Result =  PATH+"/Validation"
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://p08-projet-data
PATH_Data:   s3://p08-projet-data/Test
PATH_Result: s3://p08-projet-data/Validation

## Traitement des données

#### Lecture en format .jpg, et chargement des images en dataframe Spark (traitement distribué).

In [5]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Extraction, et ajout des noms de fruits dans la colonne label.

In [6]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(30,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+------------------------------------------------+-----------+
|path                                            |label      |
+------------------------------------------------+-----------+
|s3://p08-projet-data/Test/apple_hit_1/r0_115.jpg|apple_hit_1|
|s3://p08-projet-data/Test/apple_hit_1/r0_119.jpg|apple_hit_1|
|s3://p08-projet-data/Test/apple_hit_1/r0_107.jpg|apple_hit_1|
|s3://p08-projet-data/Test/apple_hit_1/r0_143.jpg|apple_hit_1|
|s3://p08-projet-data/Test/apple_hit_1/r0_111.jpg|apple_hit_1|
|s3://p08-projet-data/Test/apple_hit_1/r0_127.jpg|apple_hit_1|
|s3://p08-projet-data/Test/apple_hit_1/r0_139.jpg|apple_hit_1|
|s3://p08-projet-data/Test/apple_hit_1/r0_123.jpg|apple_hit_1|
|s3://p08-projet-data/Test/apple_hit_1/r0_151.jpg|apple_hit_1|
|s3://p08-projet-data/Test/apple_h

In [7]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+-----------+
|                path|   modificationTime|length|             content|      label|
+--------------------+-------------------+------+--------------------+-----------+
|s3://p08-projet-d...|2024-08-09 16:55:30|125135|[FF D8 FF E0 00 1...|apple_hit_1|
|s3://p08-projet-d...|2024-08-09 16:55:31|124785|[FF D8 FF E0 00 1...|apple_hit_1|
|s3://p08-projet-d...|2024-08-09 16:55:27|123514|[FF D8 FF E0 00 1...|apple_hit_1|
|s3://p08-projet-d...|2024-08-09 16:55:36|122958|[FF D8 FF E0 00 1...|apple_hit_1|
|s3://p08-projet-d...|2024-08-09 16:55:29|122807|[FF D8 FF E0 00 1...|apple_hit_1|
+--------------------+-------------------+------+--------------------+-----------+
only showing top 5 rows

In [8]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+------------------------------------------------+-----------+
|path                                            |label      |
+------------------------------------------------+-----------+
|s3://p08-projet-data/Test/apple_hit_1/r0_115.jpg|apple_hit_1|
|s3://p08-projet-data/Test/apple_hit_1/r0_119.jpg|apple_hit_1|
|s3://p08-projet-data/Test/apple_hit_1/r0_107.jpg|apple_hit_1|
|s3://p08-projet-data/Test/apple_hit_1/r0_143.jpg|apple_hit_1|
|s3://p08-projet-data/Test/apple_hit_1/r0_111.jpg|apple_hit_1|
+------------------------------------------------+-----------+
only showing top 5 rows

None

## Instanciation du modèle

In [9]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [10]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

####  Diffusion des poids du modèle MobileNetV2 à tous les nœuds du cluster Spark en utilisant une variable partagée (broadcast)

In [12]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

####  Définition du processus de chargement des images et application de leur featurisation

In [14]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



#### Featurisation par batch du dataframe Spark (un batch représentant un dataframe Pandas d'images de type Series)

In [15]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
features_df.dtypes

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[('path', 'string'), ('label', 'string'), ('features', 'array<float>')]

#### Déscription de la featurisation de chaque label d'images sous forme de Dataframe

In [17]:
features_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+
|                path|               label|            features|
+--------------------+--------------------+--------------------+
|s3://p08-projet-d...|         apple_hit_1|[0.07385951, 0.45...|
|s3://p08-projet-d...|         apple_hit_1|[0.9009807, 0.192...|
|s3://p08-projet-d...|         apple_hit_1|[0.0, 0.24412937,...|
|s3://p08-projet-d...|         apple_hit_1|[0.941112, 0.0722...|
|s3://p08-projet-d...|         apple_hit_1|[0.006548152, 0.1...|
|s3://p08-projet-d...|              pear_3|[0.5223791, 0.240...|
|s3://p08-projet-d...|         apple_hit_1|[0.48257026, 0.43...|
|s3://p08-projet-d...|              pear_3|[0.40664485, 0.11...|
|s3://p08-projet-d...|              pear_3|[0.5683563, 0.022...|
|s3://p08-projet-d...|         apple_red_3|[0.68562263, 0.0,...|
|s3://p08-projet-d...|          cucumber_3|[0.64268076, 0.79...|
|s3://p08-projet-d...|         apple_red_3|[0.6739433, 0.036...|
|s3://p08-projet-d...|app

## Transformation de l'array des features en Vector

#### Pre-processing  des features au format colonne.

In [18]:
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
df_with_vectors = features_df.select(features_df["path"],
                                     features_df["label"],
                                     list_to_vector_udf(features_df["features"]).alias("features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
df_with_vectors.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+
|                path|               label|            features|
+--------------------+--------------------+--------------------+
|s3://p08-projet-d...|         apple_hit_1|[0.24259111285209...|
|s3://p08-projet-d...|         apple_hit_1|[0.98785418272018...|
|s3://p08-projet-d...|         apple_hit_1|[0.14053782820701...|
|s3://p08-projet-d...|     cabbage_white_1|[0.0,0.5088325142...|
|s3://p08-projet-d...|         apple_hit_1|[0.94111198186874...|
|s3://p08-projet-d...|         apple_hit_1|[0.00654815183952...|
|s3://p08-projet-d...|         apple_hit_1|[1.00341403484344...|
|s3://p08-projet-d...|              pear_3|[0.40664485096931...|
|s3://p08-projet-d...|              pear_3|[0.56835627555847...|
|s3://p08-projet-d...|              pear_3|[0.70505595207214...|
|s3://p08-projet-d...|         apple_red_3|[0.67394328117370...|
|s3://p08-projet-d...|apple_crimson_snow_1|[0.02197414450347...|
|s3://p08-projet-d...|   

## Standardisation 

#### Standardisation des vecteurs de features associés à chaque image

In [21]:
scaler = StandardScaler(
    inputCol = 'features',
    outputCol = 'scaledFeatures',
    withMean = True,
    withStd = True
).fit(df_with_vectors)

df_scaled = scaler.transform(df_with_vectors)
df_scaled.show(6)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------------+--------------------+--------------------+
|                path|          label|            features|      scaledFeatures|
+--------------------+---------------+--------------------+--------------------+
|s3://p08-projet-d...|    apple_hit_1|[0.24259111285209...|[-0.3028012314020...|
|s3://p08-projet-d...|    apple_hit_1|[0.98785418272018...|[1.42220802088516...|
|s3://p08-projet-d...|    apple_hit_1|[0.14053782820701...|[-0.5390169616007...|
|s3://p08-projet-d...|cabbage_white_1|[0.0,0.5088325142...|[-0.8643102221354...|
|s3://p08-projet-d...|    apple_hit_1|[0.24671451747417...|[-0.2932570698681...|
|s3://p08-projet-d...|cabbage_white_1|[0.0,1.0982660055...|[-0.8643102221354...|
+--------------------+---------------+--------------------+--------------------+
only showing top 6 rows

## Réduction de dimension

In [22]:
pca = PCA(k=2, inputCol="scaledFeatures", outputCol="pcaFeatures")
model = pca.fit(df_scaled)
result = model.transform(df_scaled)
print('Explained Variance Ratio', model.explainedVariance.toArray())
result.show(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Explained Variance Ratio [0.11351862 0.06580965]
+--------------------+---------------+--------------------+--------------------+--------------------+
|                path|          label|            features|      scaledFeatures|         pcaFeatures|
+--------------------+---------------+--------------------+--------------------+--------------------+
|s3://p08-projet-d...|    apple_hit_1|[0.07385951280593...|[-0.6933526796099...|[2.47156306968284...|
|s3://p08-projet-d...|    apple_hit_1|[0.90098071098327...|[1.22112796126138...|[7.12050062130514...|
|s3://p08-projet-d...|    apple_hit_1|[0.24671451747417...|[-0.2932570698681...|[6.62779819883693...|
|s3://p08-projet-d...|cabbage_white_1|[0.0,1.0982660055...|[-0.8643102221354...|[20.4530928506329...|
|s3://p08-projet-d...|    apple_hit_1|[1.00341403484344...|[1.45822334213487...|[7.48031783818804...|
|s3://p08-projet-d...|         pear_3|[0.40664485096931...|[0.07692269101720...|[-6.9077826873077...|
|s3://p08-projet-d...|         pe

## PATH où seront inscrits les fichiers au format "parquet"

In [23]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://p08-projet-data/Validation

## Enregistrement des données traitées (format "parquet")

In [24]:
result.write.mode("overwrite").parquet(PATH_Result+'/parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Enregistrement des données traitées (format "csv")

In [25]:
pca_df = result.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
pca_df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                  path  ...                                pcaFeatures
0     s3://p08-projet-data/Test/apple_hit_1/r0_175.jpg  ...  [2.4715630696828406, -11.594201652340674]
1     s3://p08-projet-data/Test/apple_hit_1/r1_263.jpg  ...   [7.120500621305144, -14.576393567307049]
2     s3://p08-projet-data/Test/apple_hit_1/r2_163.jpg  ...   [1.0671678241626024, -13.36204269412166]
3     s3://p08-projet-data/Test/apple_hit_1/r1_155.jpg  ...  [-1.332690500927604, -10.912372748727593]
4     s3://p08-projet-data/Test/apple_hit_1/r2_271.jpg  ...    [5.01081513848441, -15.897631966119318]
...                                                ...  ...                                        ...
3105      s3://p08-projet-data/Test/apple_6/r1_179.jpg  ...  [0.48068530636768014, -6.644619859468527]
3106       s3://p08-projet-data/Test/apple_6/r1_75.jpg  ...  [-1.1384255322765977, -6.103760404375136]
3107      s3://p08-projet-data/Test/apple_6/r0_231.jpg  ...  [-1.40113176

In [27]:
pca_df.to_csv(PATH_Result+'/'+'result.csv')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Chargement des données enregistrées et validation du résultat

In [28]:
dfs = spark.read.parquet(PATH_Result+'/parquet')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
df = dfs.toPandas()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Affichage partielle du dataframe

In [30]:
df.head(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                               path  ...                                 pcaFeatures
0  s3://p08-projet-data/Test/apple_hit_1/r0_179.jpg  ...  [0.10636431522734317, -11.122855579103772]
1  s3://p08-projet-data/Test/apple_hit_1/r2_171.jpg  ...   [1.9206178539053012, -13.562380455596323]
2  s3://p08-projet-data/Test/apple_hit_1/r2_219.jpg  ...   [0.9618222427576699, -12.182892769446788]
3  s3://p08-projet-data/Test/apple_hit_1/r0_215.jpg  ...  [-1.3552467770578076, -12.011548832286111]
4  s3://p08-projet-data/Test/apple_hit_1/r1_143.jpg  ...   [-2.7216517194804357, -9.763679068877822]

[5 rows x 5 columns]

In [31]:
df.loc[0,'pcaFeatures'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(2,)