# **Déployez un modèle dans le cloud**

# Sommaire
    
* **✔ 1. [Préambule](#introduction)**
    * ✔ 1.1 [Problématique](#problem-statement)
    * ✔ 1.2 Objectifs dans ce projet
    * ✔ 1.3 Déroulement des étapes du projet
* **2. Choix techniques généraux retenus**
    * 2.1 Calcul distribué
    * 2.2 Transfert Learning
* **3. Déploiement de la solution en local**
    * 3.1 Environnement de travail
    * 3.2 Installation de Spark
    * 3.3 Installation des packages
    * 3.4 Import des librairies
    * 3.5 Définition des PATH pour charger les images et enregistrer les résultats
    * 3.6 Création de la SparkSession
    * 3.7 Traitement des données
        * 3.7.1 Chargement des données
        * 3.7.2 Préparation du modèle
        * 3.7.3 Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF
        * 3.7.4 Exécution des actions d'extractions de features
    * 3.8 Chargement des données enregistrées et validation du résultat
* **4. Déploiement de la solution sur le cloud**
    * 4.1 Choix du prestataire cloud : AWS
    * 4.2 Choix de la solution technique : EMR
    * 4.3 Choix de la solution de stockage des données : Amazon S3
    * 4.4 Configuration de l'environnement de travail
    * 4.5 Upload de nos données sur S3
    * 4.6 Configuration du serveur EMR
        * 4.6.1 Étape 1 : Logiciels et étapes
            * 4.6.1.1 Configuration des logiciels
            * 4.6.1.2 Modifier les paramètres du logiciel
        * 4.6.2 Étape 2 : Matériel
        * 4.6.3 Étape 3 : Paramètres de cluster généraux
            * 4.6.3.1 Options générales
            * 4.6.3.2 Actions d'amorçage
        * 4.6.4 Étape 4 : Sécurité
            * 4.6.4.1 Options de sécurité
    * 4.7 Instanciation du serveur
    * 4.8 Création du tunnel SSH à l'instance EC2 (Maître)
        * 4.8.1 Création des autorisations sur les connexions entrantes
        * 4.8.2 Création du tunnel ssh vers le Driver
        * 4.8.3 Configuration de FoxyProxy
        * 4.8.4 Accès aux applications du serveur EMR via le tunnel ssh
    * 4.9 Connexion au notebook JupyterHub
    * 4.10 Exécution du code
        * 4.10.1 Démarrage de la session Spark
        * 4.10.2 Installation des packages
        * 4.10.3 Import des librairies
        * 4.10.4 Définition des PATH pour charger les images et enregistrer les résultats
        * 4.10.5 Traitement des données
            * 4.10.5.1 Chargement des données
            * 4.10.5.2 Préparation du modèle
            * 4.10.5.3 Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF
            * 4.10.5.4 Exécutions des actions d'extractions de features
        * 4.10.6 Chargement des données enregistrées et validation du résultat
    * 4.11 Suivi de l'avancement des tâches avec le Serveur d'Historique Spark
    * 4.12 Résiliation de l'instance EMR
    * 4.13 Cloner le serveur EMR (si besoin)
    * 4.14 Arborescence du serveur S3 à la fin du projet
* **5. Conclusion**

# <a id="introduction"></a> 1. **Préambule**

## <a id="problem-statement"></a> 1.1 Problématique

La très jeune start-up de l'AgriTech, nommée "**Fruits**!", cherche à proposer des solutions innovantes pour la récolte des fruits.

La volonté de l’entreprise est de **préserver la biodiversité des fruits** en permettant des traitements spécifiques pour chaque espèce de fruits en développant des robots cueilleurs intelligents.

La start-up souhaite dans un premier temps se faire connaître en mettant à disposition du grand public une application mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

Pour la start-up, cette application permettrait de **sensibiliser le grand public** à la biodiversité des fruits et de mettre en place une première version du moteur de classification des images de fruits.

De plus, le développement de l’**application mobile** permettra de construire une première version de l'architecture **Big Data** nécessaire.

## 1.2 Objectifs dans ce projet

1. Développer une première chaîne de traitement des données qui comprendra :
   * le **preprocessing**,
   * et une étape de **réduction de dimension**.
2. Tenir compte du fait que le volume de données va augmenter très rapidement après la livraison de ce projet, ce qui implique de:
   * Déployer le traitement des données dans un **environnement Big Data**
   * Développer les scripts en **pyspark** pour effectuer du **calcul distribué**

## 1.3 Déroulement des étapes du projet

Le projet va être réalisé en 2 temps, dans deux environnements différents.

Nous allons dans un premier temps développer et exécuter notre code en local, en travaillant sur un nombre limité d'images à traiter.

Une fois les choix techniques validés, nous déploierons notre solution dans un environnement Big Data en mode distribué.

<u>Pour cette raison, ce projet sera divisé en 3 parties</u>:
1. Liste des choix techniques généraux retenus
2. Déploiement de la solution en local
3. Déploiement de la solution dans le cloud

# 2\. **Choix techniques généraux retenus**

## 2.1 Calcul distribué

L’énoncé du projet nous impose de développer des scripts en **pyspark** afin de <u>prendre en compte l’augmentation très rapide du volume de donné après la livraison du projet</u>.

Pour comprendre rapidement et simplement ce qu’est **pyspark** et son principe de fonctionnement, nous vous conseillons de lire cet article : [PySpark : Tout savoir sur la librairie Python](https://datascientest.com/pyspark)

<u>Le début de l’article nous dit ceci </u>:

« *Lorsque l’on parle de traitement de bases de données sur python, on pense immédiatement à la librairie pandas. Cependant, lorsqu’on a affaire à des bases de données trop massives, les calculs deviennent trop lents. Heureusement, il existe une autre librairie python, assez proche de pandas, qui permet de traiter des très grandes quantités de données : PySpark. Apache Spark est un framework open-source développé par l’AMPLab de UC Berkeley permettant de traiter des bases de données massives en utilisant le calcul distribué, technique qui consiste à exploiter plusieurs unités de calcul réparties en clusters au profit d’un seul projet afin de diviser le temps d’exécution d’une requête. 
Spark a été développé en Scala et est au meilleur de ses capacités dans son langage natif. Cependant, la librairie PySpark propose de l’utiliser avec le langage Python, en gardant des performances similaires à des implémentations en Scala. Pyspark est donc une bonne alternative à la librairie pandas lorsqu’on cherche à traiter des jeux de données trop volumineux qui entraînent des calculs trop chronophages.* »

Comme nous le constatons, **pySpark** est un moyen de communiquer avec **Spark** via le langage **Python**.

**Spark**, quant à lui, est un outil qui permet de gérer et de coordonner l'exécution de tâches sur des données à travers un groupe d'ordinateurs.

<u>Spark (ou Apache Spark) est un framework open source de calcul distribué in-memory pour le traitement et l'analyse de données massives</u>.

Un autre [article très intéressant et beaucoup plus complet pour comprendre le **fonctionnement de Spark**](https://www.veonum.com/apache-spark-pour-les-nuls/), ainsi que le rôle des **Spark Session** que nous utiliserons dans ce projet.

<u>Voici également un extrait</u>:

*Les applications Spark se composent d’un pilote (« driver process ») et de plusieurs exécuteurs (« executor processes »). Il peut être configuré pour être lui-même l’exécuteur (local mode) ou en utiliser autant que nécessaire pour traiter l’application, Spark prenant en charge la mise à l’échelle automatique par une configuration d’un nombre minimum et maximum d’exécuteurs.*

![Schéma de Spark](../baseline/img/spark-schema.png)

*Le driver (parfois appelé « Spark Session ») distribue et planifie les tâches entre les différents exécuteurs qui les exécutent et permettent un traitement réparti. Il est le responsable de l’exécution du code sur les différentes machines.<br />
Chaque exécuteur est un processus Java Virtual Machine (JVM) distinct dont il est possible de configurer le nombre de CPU et la quantité de mémoire qui lui est alloué.<br />
Une seule tâche peut traiter un fractionnement de données à la fois.*

Dans les deux environnements (Local et Cloud) nous utiliserons donc **Spark** et nous l’exploiterons à travers des scripts python grâce à **PySpark**.

Dans la <u>version locale</u> de notre script nous **simulerons le calcul distribué** afin de valider que notre solution fonctionne.

Dans la <u>version cloud</u> nous **réaliserons les opérations sur un cluster de machines**.

## 2.2 Transfert Learning

L'énoncé du projet nous demande également de réaliser une première chaîne de traitement des données qui comprendra la **préparation des données** et une étape de **réduction de dimensionnalité**.

Il est également précisé qu'il n'est pas nécessaire d'entraîner un modèle pour le moment.

Nous décidons de partir sur une solution de **transfert learning**, qui consiste à utiliser un modèle pré-entraîné  (ici **MobileNetV2**) pour résoudre un problème similaire plus général et à l'adapter à notre problématique spécifique.

Nous allons fournir au modèle nos images, et nous allons <u>récupérer l'avant dernière couche</u> du modèle.

En effet la dernière couche de modèle est une couche **softmax** qui permet la classification des images ce que nous ne souhaitons pas dans ce projet.

L'avant dernière couche correspond à un **vecteur réduit** de dimension (1, 1, 1280).

Cela permettra de réaliser une première version du moteur pour la classification des images des fruits.

**MobileNetV2** a été retenu pour sa <u>rapidité d'exécution</u>, particulièrement adaptée pour le traitement d'un gros volume de données ainsi que la <u>faible dimensionnalité du vecteur de caractéristique en sortie</u> (1, 1, 1280)

# 3\. Déploiement de la solution en local

## 3.1 Environnement de travail

Pour des raisons de simplicité, nous développons dans un environnement Linux Ubuntu (exécuté depuis une machine Windows dans une machine virtuelle)
* Pour installer une machine virtuelle :  https://www.malekal.com/meilleurs-logiciels-de-machine-virtuelle-gratuits-ou-payants/


Compte tenu de l'usage dans l'entreprise de développer dans un environnement intégré de développement (IDE) VS Code sur Windows, nous avons profité de ce projet pilote pour comparer 3 solutions pour le développement et le test en local avant le déploiement dans le cloud :
1. Installation de Spark directement sur Windows, ce qui peux se faire avec la version 3.4.0 de Spark sans devoir nécessairement installer Winutils, l'interface Hadoop (HDFS) vers les fonctionnalités natives de Windows.
2. Installation de Spark sur un sous-système WSL Ubuntu 20.04, et mise en place d'une communication client-serveur en réseau local entre les deux environnements : déploiement depuis les notebooks Jupyter/VS Code/Windows de scripts PySpark vers le master Spark hébergé sur le système Ubuntu.
3. Réseau local physique avec cette fois-ci une communication en réseau privé avec une seconde machine équipé du seul système Ubuntu.
4. Utilisation des fonctionnalités Remote Development de VS Code, avec docker / ssh / ... 


## 3.2 Installation de Spark

[La première étape consiste à installer Spark ](https://computingforgeeks.com/how-to-install-apache-spark-on-ubuntu-debian/)

## 3.3 Installation des packages

On installe ensuite à l'aide du gestionnaire de paquets **pip** les packages qui nous seront nécessaires :

In [None]:
%pip install Pandas pillow tensorflow pyspark pyarrow

In [3]:
%pip show Pandas pillow tensorflow pyspark pyarrow

Name: pandas
Version: 2.0.0
Summary: Powerful data structures for data analysis, time series, and statistics
Home-page: 
Author: 
Author-email: The Pandas Development Team <pandas-dev@python.org>
License: BSD 3-Clause License

Copyright (c) 2008-2011, AQR Capital Management, LLC, Lambda Foundry, Inc. and PyData Development Team
All rights reserved.

Copyright (c) 2011-2023, Open source contributors.

Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:

* Redistributions of source code must retain the above copyright notice, this
  list of conditions and the following disclaimer.

* Redistributions in binary form must reproduce the above copyright notice,
  this list of conditions and the following disclaimer in the documentation
  and/or other materials provided with the distribution.

* Neither the name of the copyright holder nor the names of its
  contributors may be used to endorse or promote

## 3.4 Import des librairies

In [1]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from keras.utils import img_to_array
from keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

### 📌 Description des librairies et fonctions utilisées

📌 **UDF** est l'acronyme de *User-Defined Function*.

Description en français :
* **`pandas`** : bibliothèque Python pour la manipulation et l'analyse de données.
* **`PIL (Python Imaging Library)`** : bibliothèque Python pour l'ouverture et la manipulation d'images.
* **`numpy`** : bibliothèque Python pour la manipulation de tableaux multidimensionnels.
* **`io`** : bibliothèque Python pour la gestion des entrées/sorties.
* **`os`** : bibliothèque Python pour l'interaction avec le système d'exploitation.
* **`tensorflow`** : bibliothèque open-source d'apprentissage automatique pour les données numériques et les réseaux de neurones.
* **`MobileNetV2`** : modèle de réseau de neurones pré-entraîné pour la classification d'images.
* **`preprocess_input`** : fonction de prétraitement pour les images avant l'utilisation de MobileNetV2.
* **`img_to_array`** : fonction pour convertir une image PIL en tableau numpy.
* **`Model`** : classe Keras pour la définition de modèles d'apprentissage en profondeur.
* **`pyspark`** : framework open-source pour le traitement de données en masse distribuées sur des clusters.
* **`col`** : fonction pour sélectionner une colonne dans un DataFrame Spark.
* **`pandas_udf`** : fonction pour exécuter une fonction Pandas sur un DataFrame Spark.
* **`PandasUDFType`** : enum pour spécifier le type de la fonction PandasUDF.
* **`element_at`** : fonction pour extraire l'élément d'une liste à une position donnée dans un DataFrame Spark.
* **`split`** : fonction pour diviser une chaîne en une liste de sous-chaînes en utilisant un délimiteur spécifié dans un DataFrame Spark.
* **`SparkSession`** : point d'entrée pour l'interaction avec les données dans Spark.

Description en anglais :
* **`pandas`**: Python library for data manipulation and analysis.
* **`PIL (Python Imaging Library)`**: Python library for opening and manipulating images.
* **`numpy`**: Python library for multidimensional array manipulation.
* **`io`**: Python library for managing input/output operations.
* **`os`**: Python library for interacting with the operating system.
* **`tensorflow`**: open-source machine learning library for numerical data and neural networks.
* **`MobileNetV2`**: pre-trained neural network model for image classification.
* **`preprocess_input`**: pre-processing function for images before using MobileNetV2.
* **`img_to_array`**: function to convert a PIL image to a numpy array.
* **`Model`**: Keras class for defining deep learning models.
* **`pyspark`**: open-source framework for processing distributed data on clusters.
* **`col`**: function to select a column in a Spark DataFrame.
* **`pandas_udf`**: function to execute a Pandas function on a Spark DataFrame.
* **`PandasUDFType`**: enum to specify the type of PandasUDF function.
* **`element_at`**: function to extract the element of a list at a given position in a Spark DataFrame.
* **`split`**: function to split a string into a list of sub-strings using a specified delimiter in a Spark DataFrame.
* **`SparkSession`**: entry point for interacting with data in Spark.

## 3.5 Définition des PATH pour charger les images <br /> et enregistrer les résultats

Dans cette version locale nous partons du principe que les données sont stockées dans le même répertoire que le notebook.

Nous n'utilisons qu'un extrait de **300 images** à traiter dans cette première version en local.

L'extrait des images à charger est stockée dans le dossier `data/im/sample_300`. <mark>**Test1**</mark>

Nous enregistrerons le résultat de notre traitement dans le dossier `tmp/prep/im/sample_300`. <mark>"**Results_Local**"</mark>

### 🚧 **TODO** 2 choses à faire ici :
1. Former l'extrait
2. Mettre en place mon env habituel (pepper.env)

Pour extraire le sample 300 ?

A priori, il faut prendre équitablement sur les 131 espèces (de fruits ET légumes). Donc, dans le cadre strict du projet, nous devrions ne prendre que les images de fruits. Mais clairement, c'est le type d'attention et de scrupule dont ils se f...

Il y a environ 150 exemplaires par catégorie.

L'apprentissage n'est pas supervisé, c'est du transfer learning.

Donc on n'a pas à conserver les labels donnés par les noms de dossiers.

Faisons une première version, il sera toujours temps d'améliorer ensuite.

In [2]:
from fruits.env import get_data_im_sample_300_dir, get_prep_im_sample_300_dir

print(get_data_im_sample_300_dir())
print(get_prep_im_sample_300_dir())

C:\Users\franc\Projects\pepper_cloud_based_model\data\im\sample_300
C:\Users\franc\Projects\pepper_cloud_based_model\tmp\prep\im\sample_300


In [3]:
from pepper.env import get_project_dir
from pepper.utils import _get_filenames_glob
import os
#C:\Users\franc\Projects\pepper_cloud_based_model\dataset\fruits-360_dataset\Test
raw_src_im_dir = os.path.join(get_project_dir(), r"dataset\fruits-360_dataset\Test")
display(raw_src_im_dir)
filenames = _get_filenames_glob(raw_src_im_dir, recursive=True)
display(filenames)

'C:\\Users\\franc\\Projects\\pepper_cloud_based_model\\dataset\\fruits-360_dataset\\Test'

['Apple Braeburn/321_100.jpg',
 'Apple Braeburn/322_100.jpg',
 'Apple Braeburn/323_100.jpg',
 'Apple Braeburn/324_100.jpg',
 'Apple Braeburn/325_100.jpg',
 'Apple Braeburn/326_100.jpg',
 'Apple Braeburn/327_100.jpg',
 'Apple Braeburn/32_100.jpg',
 'Apple Braeburn/33_100.jpg',
 'Apple Braeburn/34_100.jpg',
 'Apple Braeburn/35_100.jpg',
 'Apple Braeburn/36_100.jpg',
 'Apple Braeburn/37_100.jpg',
 'Apple Braeburn/38_100.jpg',
 'Apple Braeburn/39_100.jpg',
 'Apple Braeburn/3_100.jpg',
 'Apple Braeburn/40_100.jpg',
 'Apple Braeburn/41_100.jpg',
 'Apple Braeburn/42_100.jpg',
 'Apple Braeburn/43_100.jpg',
 'Apple Braeburn/44_100.jpg',
 'Apple Braeburn/45_100.jpg',
 'Apple Braeburn/46_100.jpg',
 'Apple Braeburn/47_100.jpg',
 'Apple Braeburn/48_100.jpg',
 'Apple Braeburn/49_100.jpg',
 'Apple Braeburn/4_100.jpg',
 'Apple Braeburn/50_100.jpg',
 'Apple Braeburn/51_100.jpg',
 'Apple Braeburn/52_100.jpg',
 'Apple Braeburn/53_100.jpg',
 'Apple Braeburn/54_100.jpg',
 'Apple Braeburn/55_100.jpg',
 'App

In [4]:
print(len(filenames))

22688


### 🚧 Sampling des images

In [14]:
from pepper.utils import create_if_not_exist
import os
import random
import shutil

def sample_images(source_dir: str, target_dir: str, n_samples: int):
    # Récupérer la liste des sous-dossiers
    subdirs = [
        subdir for subdir in os.listdir(source_dir)
        if os.path.isdir(os.path.join(source_dir, subdir))
    ]

    n_images_per_folder, remainder = divmod(n_samples, len(subdirs))
    
    for subdir in subdirs:
        subdir_path = os.path.join(source_dir, subdir)

        # Récupérer la liste des images dans le dossier
        images = [
            image for image in os.listdir(subdir_path)
            if os.path.isfile(os.path.join(subdir_path, image))
        ]

        # Sélectionner les images aléatoirement
        n_images = n_images_per_folder + (remainder > 0) 
        if len(images) <= n_images:
            selected_images = images
        else:
            selected_images = random.sample(images, n_images)
        
        if n_images == len(selected_images):
            remainder -= 1
        else:
            remainder += n_images - len(selected_images)

        if len(selected_images) > 0:
            create_if_not_exist(os.path.join(target_dir, subdir))
        
        # Copier les images sélectionnées vers le dossier cible
        for image in selected_images:
            source_path = os.path.join(subdir_path, image)
            target_path = os.path.join(target_dir, subdir, image)
            shutil.copyfile(source_path, target_path)


In [None]:
import os
from pepper.env import get_project_dir
from pepper.utils import create_if_not_exist
from fruits.storage_utils import sample_images
project_dir = get_project_dir()
raw_src_im_dir = os.path.join(project_dir, r"dataset\fruits-360_dataset\Test")
sample_300_im_dir = os.path.join(project_dir, r"data\im\sample_300")
create_if_not_exist(sample_300_im_dir)
n_images_per_folder = sample_images(raw_src_im_dir, sample_300_im_dir, 300)

In [3]:
n_samples = sum(n_images for subdir, n_images in n_images_per_folder.items())
display(n_samples)

300

### 🚧 Local vers S3, S3 vers S3

Compartiment `s3://pepper-labs-fruits`

On va d'abord effectuer une copie de la sélection depuis les dossiers locaux directement vers un compartiment S3.

Puis, par challenge, le faire directement entre le dépôt S3 en ligne des images et notre dépôt.

Enfin, il sera intéressant de ne même pas effectuer cette copie, mais d'aller prélever directement notre échantillon à la source (EMR utilisant le S3 officiel des images Fruits)

In [4]:
%pip install boto3

Collecting boto3
  Downloading boto3-1.26.133-py3-none-any.whl (135 kB)
                                              0.0/135.6 kB ? eta -:--:--
     -------------------------------------- 135.6/135.6 kB 4.0 MB/s eta 0:00:00
Collecting botocore<1.30.0,>=1.29.133 (from boto3)
  Downloading botocore-1.29.133-py3-none-any.whl (10.7 MB)
                                              0.0/10.7 MB ? eta -:--:--
     ---                                      1.1/10.7 MB 34.0 MB/s eta 0:00:01
     ----------                               2.7/10.7 MB 34.9 MB/s eta 0:00:01
     ----------------                         4.5/10.7 MB 35.6 MB/s eta 0:00:01
     -----------------------                  6.4/10.7 MB 37.2 MB/s eta 0:00:01
     --------------------------------         8.7/10.7 MB 39.7 MB/s eta 0:00:01
     --------------------------------------  10.7/10.7 MB 40.9 MB/s eta 0:00:01
     --------------------------------------- 10.7/10.7 MB 36.3 MB/s eta 0:00:00
Collecting jmespath<2.0.0,>=0.7.1

In [1]:
import boto3
s3 = boto3.resource("s3")
for bucket in s3.buckets.all():
    print(bucket.name)

pepper-bucket
pepper-labs-fruits


Old version de l'alternant

In [3]:
PATH = os.getcwd()
PATH_Data = PATH+'/data/Test1'
PATH_Result = PATH+'/data/Results_Local'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result_Local: '+PATH_Result)

PATH:        c:\Users\franc\Projects\pepper_cloud_based_model\notebooks
PATH_Data:   c:\Users\franc\Projects\pepper_cloud_based_model\notebooks/data/Test1
PATH_Result_Local: c:\Users\franc\Projects\pepper_cloud_based_model\notebooks/data/Results_Local


## 3.6 Création de la SparkSession

L’application Spark est contrôlée grâce à un processus de pilotage (driver process) appelé **SparkSession**.

<u>Une instance de **SparkSession** est la façon dont Spark exécute les fonctions définies par l’utilisateur dans l’ensemble du cluster</u>. <u>Une SparkSession correspond toujours à une application Spark</u>.

<u>Ici nous créons une session spark en spécifiant dans l'ordre</u> :

1. un **nom pour l'application**, qui sera affichée dans l'interface utilisateur Web Spark "**P8**"
2. que l'application doit s'exécuter **localement**.
    * Nous ne définissons pas le nombre de cœurs à utiliser (comme `.master('local[4])` pour 4 cœurs à utiliser),
    * nous utiliserons donc tous les cœurs disponibles dans notre processeur.
3. une option de configuration supplémentaire permettant d'utiliser le **format "parquet"** que nous utiliserons pour enregistrer et charger le résultat de notre travail.
4. vouloir **obtenir une session spark** existante ou si aucune n'existe, en créer une nouvelle

In [4]:
from pyspark.sql import SparkSession
spark = (
    SparkSession
    .builder
    .appName("Fruits")
    .master("local")
    .config("spark.sql.parquet.writeLegacyFormat", "true")
    .getOrCreate()
)

In [5]:
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.sql.caseSensitive", "True")

# Obtention des paramètres de configuration
configurations = conf.getAll()

# Affichage des paramètres de configuration
for config in configurations:
    print(config)

('spark.app.submitTime', '1684166786102')
('spark.master', 'local[*]')
('spark.submit.pyFiles', '')
('spark.submit.deployMode', 'client')
('spark.sql.parquet.writeLegacyFormat', 'true')
('spark.app.name', 'Fruits')
('spark.ui.showConsoleProgress', 'true')
('spark.sql.caseSensitive', 'True')


Nous affectons à la variable **`sc`** le **`SparkContext`** attaché à l'objet **`spark`** :

In [6]:
sc = spark.sparkContext

<u>Affichage des informations de Spark en cours d'execution</u> :

In [7]:
spark

## 3.7 Traitement des données

<u>Dans la suite de notre flux de travail, nous allons successivement</u> :
1. Préparer nos données
    1. Importer les images dans un dataframe **pandas UDF**
    2. Associer aux images leur **label**
    3. Préprocesser en **redimensionnant nos images pour qu'elles soient compatibles avec notre modèle**
2. Préparer notre modèle
    1. Importer le modèle **MobileNetV2**
    2. Créer un **nouveau modèle** dépourvu de la dernière couche de MobileNetV2
3. Définir le processus de chargement des images et l'application de leur featurisation à travers l'utilisation de pandas UDF
3. Exécuter les actions d'extraction de features
4. Enregistrer le résultat de nos actions
5. Tester le bon fonctionnement en chargeant les données enregistrées

### 3.7.1 Chargement des données

Les images sont chargées au format binaire pour en faciliter le pré-traitement.



Avant de charger les images, nous spécifions que nous voulons charger uniquement les fichiers dont l'extension est **jpg**.

Nous indiquons également de charger tous les objets possibles contenus dans les sous-dossiers du dossier communiqué.

Code d'origine :
```python
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)
```

<u>Affichage des 5 premières images contenant</u> :
 - le path de l'image
 - la date et heure de sa dernière modification
 - sa longueur
 - son contenu encodé en valeur hexadécimal

<u>Je ne conserve que le **path** de l'image et j'ajoute une colonne contenant les **labels** de chaque image</u> :

In [8]:
from fruits.driver import init_spark_session, load_images
spark = init_spark_session()
images = load_images(spark)
images.printSchema()
images.select("path", "label").show(5, False)

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

+---------------------------------------------------------------------------------------------------------------+----------+
|path                                                                                                           |label     |
+---------------------------------------------------------------------------------------------------------------+----------+
|file:/C:/Users/franc/Projects/pepper_cloud_based_model/dataset/fruits-360_dataset/Test/Watermelon/r_106_100.jpg|Watermelon|
|file:/C:/Users/franc/Projects/pepper_cloud_based_model/dataset/fruits-360_dataset/Test/Watermelon/r_109_100.jpg|Watermelon|
|file:/C:/Users/franc/Projects/pepper_cloud_based_model/dataset/fruits-360_dataset/Test/Watermelon/r_108_100.jpg|Watermelon|
|file:/C:/Users/franc/Projects/pepper_cloud_b

### 3.7.2 Préparation du modèle

Je vais utiliser la technique du **transfert learning** pour extraire les features des images.

J'ai choisi d'utiliser le modèle **MobileNetV2** pour sa rapidité d'exécution comparée à d'autres modèles comme *VGG16* par exemple.

Pour en savoir plus sur la conception et le fonctionnement de MobileNetV2, je vous invite à lire [cet article](https://towardsdatascience.com/review-mobilenetv2-light-weight-model-image-classification-8febb490e61c).

<u>Voici le schéma de son architecture globale</u> : 

![Architecture de MobileNetV2](../baseline/img/mobilenetv2_architecture.png)

Il existe une dernière couche qui sert à classer les images selon 1000 catégories que nous ne voulons pas utiliser.

L'idée dans ce projet est de récupérer le **vecteur de caractéristiques de dimensions `(1, 1, 1280)`** qui servira, plus tard, au travers d'un moteur de classification à reconnaître les différents fruits du jeu de données.

Comme d'autres modèles similaires, **MobileNetV2**, lorsqu'on l'utilise en incluant toutes ses couches, attend obligatoirement des images de dimension `(224, 224, 3)`. Nos images étant toutes de dimension `(100, 100, 3)`, nous devrons simplement les **redimensionner** avant de les confier au modèle.

<u>Dans l'odre</u> :
 1. Nous chargeons le modèle **MobileNetV2** avec les poids **précalculés** issus d'**imagenet** et en spécifiant le format de nos images en entrée
 2. Nous créons un nouveau modèle avec:
  - <u>en entrée</u> : l'entrée du modèle MobileNetV2
  - <u>en sortie</u> : l'avant dernière couche du modèle MobileNetV2

In [9]:
from keras.applications.mobilenet_v2 import MobileNetV2
model = MobileNetV2(
    weights="imagenet",
    include_top=True,
    input_shape=(224, 224, 3)
)

In [10]:
from keras import Model
new_model = Model(
    inputs=model.input,
    outputs=model.layers[-2].output
)

Affichage du résumé de notre nouveau modèle où nous constatons que <u>nous récupérons bien en sortie un vecteur de dimension (1, 1, 1280)</u> :

In [11]:
# Affichage dégradé que je ne m'explique pas, par rapport à l'original :
# pas de lignes blanches séparatrices
# retour à la ligne en seconde colonne
# Vu la pression que j'ai au temps, pas le temps de regarder cela, mais je n'aime pas
display(new_model.summary())

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

None

In [None]:
new_model.summary()

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

Tous les workeurs doivent pouvoir accéder au modèle ainsi qu'à ses poids. Une bonne pratique consiste à charger le modèle sur le driver puis à diffuser ensuite les poids aux différents workeurs.

In [12]:
broadcast_weights = sc.broadcast(new_model.get_weights())

<u>Mettons cela sous forme de fonctions</u> :

### 🚧 **TODO** Pourquoi ça ne fonctionne pas sous cette forme ?

In [13]:
from fruits.model import init_keras_model
from fruits.driver import (
    init_spark_session,
    broadcast_model_weights
)

spark = init_spark_session()
model = init_keras_model()
broadcast_model_weights(spark, model)

### Version d'origine

```python
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(
        weights='imagenet',
        include_top=True,
        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(
        inputs=model.input,
        outputs=model.layers[-2].output)
    new_model.set_weights(broadcast_weights.value)
    return new_model
```

Avec juste quelques renommages.

Notons que broadcast est passé comme variable globale

In [14]:
def init_keras_model() -> Model:
    """Returns a MobileNetV2 model with the top layer removed
    
    Returns
    -------
    Model
        The initialized model.
    """
    # Create a MobileNetV2 model with pre-trained weights
    model = MobileNetV2(
        weights="imagenet",
        include_top=True,
        input_shape=(224, 224, 3)
    )
    
    # Set all layers in the model as non-trainable
    for layer in model.layers:
        layer.trainable = False
        
    # Create a new model without the top layer
    new_model = Model(
        inputs=model.input,
        outputs=model.layers[-2].output
    )
    
    # Broadcast the model weights
    new_model.set_weights(broadcast_weights.value)
    
    # Return the model
    return new_model

### 3.7.3 Définition du processus de chargement des images et application <br/>de leur featurisation à travers l'utilisation de pandas UDF

Ce notebook définit la logique par étapes, jusqu'à Pandas UDF.

<u>L'empilement des appels est la suivante</u> :

* Pandas UDF
    * extraire les caractéristiques de la collection d'images (`pandas.Series`) 
    * pré-traiter une image

### 🚧 **TODO** Pourquoi ça ne fonctionne pas sous cette forme ?

Idem, ces fonctions sont retravaillées, renommées et déplacées dans `fruits.executor` :

In [1]:
from fruits.executor import (
    preprocess_img,
    extract_image_features,
    extract_image_features_udf
)

### Version d'origine

```python
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)
```

Avec quelques renommages :

In [17]:
from typing import Union, Any, Iterator
import pandas as pd
import numpy as np
from pyspark.sql.functions import pandas_udf  # , PandasUDFType : Spark 3.4.0

def preprocess_img(content: Union[bytes, bytearray]) -> Any:
    """Pre-processes raw image bytes for prediction.

    Parameters
    ----------
    content : Union[bytes, bytearray]
        Raw image bytes.

    Returns
    -------
    Any
        Pre-processed image data.

    Raises
    ------
    ValueError
        If the provided content is not valid image bytes.
    """
    try:
        # Open the image from raw bytes and resize it
        img = Image.open(io.BytesIO(content)).resize([224, 224])
        # Convert the image to an array
        arr = img_to_array(img)
        return preprocess_input(arr)
    except Exception as e:
        raise ValueError(
            "Invalid image content. Please provide valid image bytes."
        ) from e


def extract_image_features(
    model: Model,
    content_series: pd.Series
) -> pd.Series:
    """Extracts image features from a pd.Series of raw images using the input
    model.

    Parameters
    ----------
    model : Model
        The pre-trained model used for feature extraction.
    content_series : pd.Series
        The pd.Series containing raw image data.

    Returns
    -------
    pd.Series
        The pd.Series containing the extracted image features.

    Note
    ----
    The function assumes that the `preprocess_img` function is defined separately.

    Raises
    ------
    ValueError
        If the model is not a valid TensorFlow Keras model.
    """
    try:
        # Preprocess the images in the content_series
        prep_imgs = np.stack(content_series.map(preprocess_img))
        # Extract the features using the model
        feats = model.predict(prep_imgs)
        # Flatten the feature tensors to vectors
        flat_feats = [f.flatten() for f in feats]
        return pd.Series(flat_feats)
    except Exception as e:
        raise ValueError(
            "Invalid model. Please provide a valid TensorFlow Keras model."
        ) from e


# See : https://www.databricks.com/blog/2020/05/20/new-pandas-udfs-and-python-type-hints-in-the-upcoming-release-of-apache-spark-3-0.html
@pandas_udf("array<float>")  # , PandasUDFType.SCALAR_ITER) = warning
def extract_image_features_udf(
    content_series_iter: Iterator[pd.Series]
) -> Iterator[pd.Series]:
    """This method is a Scalar Iterator pandas UDF wrapping our
    `extract_image_features` function. The decorator specifies that this
    returns a Spark DataFrame column of type ArrayType(FloatType).

    Parameters
    ----------
    content_series_iter : Iterator[pd.Series]
        An iterator over batches of data, where each batch is a pandas Series
        of image data.
    Yields
    ------
    Iterator[pd.Series]
        An iterator over the extracted image features for each batch of data.
    """
    # With Scalar Iterator pandas UDFs, we can load the model once and then
    # re-use it for multiple data batches. This amortizes the overhead of
    # loading big models.
    # model_weights = spark.sparkContext.broadcast(broadcast_weights.value)
    model = init_keras_model()
    # model.set_weights(model_weights.values)
    for content_series in content_series_iter:
        yield extract_image_features(model, content_series)

### 3.7.4 Exécution des actions d'extraction de features

Les Pandas UDF, sur de grands enregistrements (par exemple, de très grandes images), peuvent rencontrer des erreurs de type `Out Of Memory` (OOM). Si vous rencontrez de telles erreurs dans la cellule ci-dessous, essayez de réduire la taille du lot Arrow via `maxRecordsPerBatch`

Je n'utiliserai pas cette commande dans ce projet et je laisse donc la commande en commentaire.

In [None]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

Nous pouvons maintenant exécuter la featurisation sur l'ensemble de notre DataFrame Spark.

<u>REMARQUE</u> : Cela peut prendre beaucoup de temps, tout dépend du volume de données à traiter.

Notre jeu de données de **Test** contient **22 819 images**.

Cependant, dans l'exécution en mode **local**, nous <u>traiterons un ensemble réduit de **330 images**</u>.

### 🚧 **TODO** Pourquoi ça ne fonctionne pas sous cette forme ?

In [2]:
from fruits.driver import init_spark_session, load_images

spark = init_spark_session()
images = load_images(spark)

### Version d'origine

In [18]:
from fruits.executor import extract_image_features_udf
from pyspark.sql.functions import col

#features_df = images.repartition(20).select(
im_feats = images.repartition(20).select(
    col("path"), col("label"),
    extract_image_features_udf("content").alias("features")
)

Rappel du chemin du répertoire où seront enregistrés nos résultats en plusieurs fichiers au format **`parquet`**.

Nos résultats se présentent sous la forme d'un DataFrame à 3 colonnes :
 1. `path` le chemin de l'image
 2. `label` la classe de l'image
 3. `features`, le vecteur de caractéristiques de l'image

In [None]:
print(PATH_Result)

/home/walduch/Documents/P8/data/Results


<u>Enregistrement des données traitées au format "**parquet**"</u> :

In [19]:
display(im_feats)

DataFrame[path: string, label: string, features: array<float>]

In [20]:
from pepper.env import get_tmp_dir
import os
im_feats_pqt_path = os.path.join(get_tmp_dir(), "im_feats.pqt")
display(im_feats_pqt_path)

'C:\\Users\\franc\\Projects\\pepper_cloud_based_model\\tmp\\im_feats.pqt'

In [22]:
spark

In [21]:
im_feats.write.mode("overwrite").parquet(im_feats_pqt_path)

Py4JJavaError: An error occurred while calling o98.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 4.0 failed 1 times, most recent failure: Lost task 0.0 in stage 4.0 (TID 841) (host.docker.internal executor driver): org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/C:/Users/franc/Projects/pepper_cloud_based_model/tmp/im_feats.pqt.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:788)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\franc\Projects\pepper_cloud_based_model\modules\fruits\executor.py", line 93, in <module>
    @pandas_udf("array<float>")  # , PandasUDFType.SCALAR_ITER) = warning
     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Spark\python\lib\pyspark.zip\pyspark\sql\pandas\functions.py", line 461, in _create_pandas_udf
    return _create_udf(f, returnType, evalType)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Spark\python\lib\pyspark.zip\pyspark\sql\udf.py", line 82, in _create_udf
    return udf_obj._wrapped()
           ^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Spark\python\lib\pyspark.zip\pyspark\sql\udf.py", line 431, in _wrapped
    wrapper.returnType = self.returnType  # type: ignore[attr-defined]
                         ^^^^^^^^^^^^^^^
  File "C:\Program Files\Spark\python\lib\pyspark.zip\pyspark\sql\udf.py", line 236, in returnType
    self._returnType_placeholder = _parse_datatype_string(self._returnType)
                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 1211, in _parse_datatype_string
    sc = get_active_spark_context()
         ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 201, in get_active_spark_context
    raise RuntimeError("SparkContext or SparkSession should be created first.")
RuntimeError: SparkContext or SparkSession should be created first.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
	... 15 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:190)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:354)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:382)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:387)
	at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:360)
	at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:239)
	at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:789)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: [TASK_WRITE_FAILED] Task failed while writing rows to file:/C:/Users/franc/Projects/pepper_cloud_based_model/tmp/im_feats.pqt.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:788)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:420)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "C:\Users\franc\Projects\pepper_cloud_based_model\modules\fruits\executor.py", line 93, in <module>
    @pandas_udf("array<float>")  # , PandasUDFType.SCALAR_ITER) = warning
     ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Spark\python\lib\pyspark.zip\pyspark\sql\pandas\functions.py", line 461, in _create_pandas_udf
    return _create_udf(f, returnType, evalType)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Spark\python\lib\pyspark.zip\pyspark\sql\udf.py", line 82, in _create_udf
    return udf_obj._wrapped()
           ^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Spark\python\lib\pyspark.zip\pyspark\sql\udf.py", line 431, in _wrapped
    wrapper.returnType = self.returnType  # type: ignore[attr-defined]
                         ^^^^^^^^^^^^^^^
  File "C:\Program Files\Spark\python\lib\pyspark.zip\pyspark\sql\udf.py", line 236, in returnType
    self._returnType_placeholder = _parse_datatype_string(self._returnType)
                                   ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Spark\python\lib\pyspark.zip\pyspark\sql\types.py", line 1211, in _parse_datatype_string
    sc = get_active_spark_context()
         ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "C:\Program Files\Spark\python\lib\pyspark.zip\pyspark\sql\utils.py", line 201, in get_active_spark_context
    raise RuntimeError("SparkContext or SparkSession should be created first.")
RuntimeError: SparkContext or SparkSession should be created first.

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:561)
	at org.apache.spark.sql.execution.python.PythonArrowOutput$$anon$1.read(PythonArrowOutput.scala:118)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:514)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage2.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:403)
	at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1563)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:410)
	... 15 more


In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)

# **ICI UN POINT TRES DOULOUREUX**

Passé 6h30 sans avancer, à tourner en rond, rien ne fonctionne et surtout, je ne comprends pas le broadcasting.

## 3.8 Chargement des données enregistrées et validation du résultat

<u>On charge les données fraichement enregistrées dans un **DataFrame Pandas**</u> :

In [None]:
df = pd.read_parquet(PATH_Result, engine="pyarrow")

<u>On affiche les 5 premières lignes du DataFrame</u> :

In [None]:
df.head()

Unnamed: 0,path,label,features
0,file:/home/walduch/Documents/P8/data/Test1/App...,Apple Braeburn,"[0.86105645, 0.16019525, 0.0, 0.0, 0.0, 1.0233..."
1,file:/home/walduch/Documents/P8/data/Test1/Cle...,Clementine,"[0.45963708, 0.0, 0.0, 0.0, 0.036376934, 0.0, ..."
2,file:/home/walduch/Documents/P8/data/Test1/Cle...,Clementine,"[1.3859445, 0.04571251, 0.0, 0.0, 0.9309062, 0..."
3,file:/home/walduch/Documents/P8/data/Test1/App...,Apple Braeburn,"[1.7865905, 0.20313944, 0.0, 0.0, 0.41594356, ..."
4,file:/home/walduch/Documents/P8/data/Test1/App...,Apple Braeburn,"[0.81415516, 0.18681705, 0.0, 0.0, 0.0, 0.3806..."


<u>On valide que la dimension du vecteur de caractéristiques des images est bien de dimension 1280</u> :

In [None]:
df.loc[0, "features"].shape

(1280,)

Nous venons de valider le processus sur un jeu de données allégé en local où nous avons simulé un cluster de machines en répartissant la charge de travail sur différents cœurs de processeur au sein d'une même machine.

Nous allons maintenant généraliser le processus en déployant notre solution sur un réel cluster de machines et nous travaillerons désormais sur la totalité des $22\,819$ images de notre dossier `Test`.

# 4\. Déploiement de la solution sur le cloud

Nous avons pu valider le fonctionnement de notre solution sur un cluster local. Il s'agit à présent de le déployer sur une infrastructure *élastique*, c'est-à-dire capable de se redimensionner pour accompagner la montée en puissance et donc en charge de l'application.

**Attention**, *je travaille sous Linux avec une version Ubuntu, les commandes décrites ci-dessous sont donc réalisées  exclusivement dans cet environnement.*

Pour arrêter un choix d'architecture technique, répondons à ces 4 questions :
1. Quel prestataire de Cloud choisir ?
2. Quelles solutions de ce prestataire adopter ?
3. Où stocker nos données ?
4. Comment configurer nos outils dans ce nouvel environnement ?

## 4.1 Choix du prestataire cloud : AWS

Le leader incontesté du marché du cloud computing en son précurseur historique. L'offre **Amazon Web Services** (AWS) d'Amazon demeure à ce jour la plus large et la plus complète dans le domaine du cloud computing, même si ses challengers (Google, Microsoft, etc) n'ont pas à rougir de leur offre, en particulier sur le segment du Big Data.

D'autres acteurs proposent des offres plus spécialisées ou plus compétitives qu'il faut pouvoir envisager selon les spécificités de chaque projet. Mais en l'occurrence, notre projet est une application classique simple basée sur quelques librairies standard bien établies qui ne nécessite donc pas d'explorer de telles voies et peut se satisfaire de micro-services standards.

Nous choisissons donc AWS, pour cette raison pratique, pour la facilité et la rapidité de mise en oeuvre que nous lui connaissons déjà, ainsi que pour les possibilités avancées de contrôle des coûts et de modulations tarifaires offertes par ce prestataire (possibilité de choisir les modalités tarifaires les plus avantageuses selon le profil analytique de charge).

L'objectif immédiat est de pouvoir louer de la puissance de calcul à la demande (donc sans immobilisation), de déployer facilement en se déchargeant de la responsabilité de maintenance évolutive de l'infrastructure, de disposer d'un dimensionnement automatique transparent des ressources de service pour maintenir une continuité de service à mesure qu'augmente la base d'utilisateurs de l'application, de maîtriser les coûts induits à l'aide de tableaux de bord analytiques.

## 4.2 Choix de la solution technique : EMR

Classiquement (depuis le modèle OSI des années 70), le marketing IT segmente les offres par niveaux d'abstraction informatique et remanie le corpus pour renforcer l'idée de nouveauté.

Dans le domaine du *cloud computing*, où les ressources sont louées *as a Service* utilisé [*à la demande* (années 2000)](https://www.csoonline.com/article/2115856/ibm-s-on-demand-strategy.html), nous distinguerons essentiellement les deux niveaux auxquels nous pouvons nous déployer en tant que :
1. Sur une offre de solution **IaaS** (Infrastructure as a Service) : hébergement classique avec location de l'infrastructure.
2. Sur une offre de solution **PaaS** (Plateforme as a Service) : approche orientée micro-services, avec location des seuls espace de stockage et puissance de calcul, et donc avec une virtualisation (abstraction et découplage) totale des couches matérielles.

### Solution **IaaS**

Dans cette configuration **AWS** met à notre disposition des serveurs vierges (*instances EC2*) que nous pouvons directement administrer.

Avec une telle solution, nous pouvons reproduire pratiquement à l'identique la solution mise en œuvre localement sur notre machine.

Nous installons nous-mêmes l'ensemble des outils et dépendances dont nous avons besoin, puis nous soumettons notre script :
* Installation de **Spark**, **Java**, **Python**, **Jupyter Notebook**, et des **librairies complémentaires**
* Il nous faudra notamment veiller à configurer **chacune des machines (workers) du cluster**

|**Avantages**|**Inconvénients**|
|-|-|
|- **Liberté totale** de mise en œuvre de la solution<br/>- **Facilité de mise en œuvre** à partir d'un modèle qui s'exécute en local sur une machine Linux|- **Chronophage** : il est nécessité d'installer et de configurer toute la solution<br/>- Possible **problèmes d'installation** des outils (des problématiques qui n'existaient pas en local sur notre machine peuvent apparaître sur le serveur EC2)<br/>- Solution **non pérenne**, il faudra veiller à la mise à jour des outils et éventuellement devoir réinstaller Spark, Java etc.|

### Solution **PaaS**

La galaxie des micro-services **AWS** est l'une des plus riches de l'univers du *cloud*.

En particulier, Amazon nous propose une offre de plate-forme Big Data, son offre [EMR (*Elastic MapReduce*)](https://aws.amazon.com/fr/emr/) qui prend en charge Apache Spark, Hive, Presto et autres applications Big Data.


fournit énormément de services différents, dans l'un de ceux-là il existe une offre qui permet de louer des **instances EC2** avec des applications préinstallées et configurées : il s'agit du **service EMR**.
* **Spark** y sera déjà pré-installé
* Il est possible de demander l'installation de :
    * **Tensorflow** et **JupyterHub**
    * des **packages complémentaires**
* .. **sur l'ensemble des workers du cluster**.


|**Avantages**|**Inconvénients**|
|-|-|
|- **Facilité de mise en œuvre** : Il suffit de très peu de configuration pour obtenir un environnement parfaitement opérationnel<br/>- **Rapidité de mise en œuvre** : Une fois la première configuration réalisée, il est très facile et très rapide de recréer des clusters à l'identique qui seront disponibles presque instantanément (le temps d'instancier les serveurs soit environ 15/20 minutes)<br/>- **Solutions matérielles et logicielles optimisées** par les ingénieurs d'AWS : On sait que les versions installées vont fonctionner et que l'architecture proposée est optimisée<br/>- **Stabilité de la solution**<br/>- **Solution évolutive** : Il est facile d’obtenir à chaque nouvelle instanciation une version à jour de chaque package, en étant garanti de leur compatibilité avec le reste de l’environnement<br/>- **Maintenance de sécurisé** : Les éventuels patchs de sécurité seront automatiquement mis à jour à chaque nouvelle instanciation du cluster EMR.|<br/>- Peut-être un certain **manque de liberté** sur la version des packages disponibles ? Même si je n'ai pas constaté ce problème.|

### Choix retenu

Nous sommes actuellement en phase d'amorçage du déploiement de l'application sur une infrastructure capable de monter en charge.

La bonne solution est évidemment de nous focaliser sur un déploiement rapide sur une plate-forme qui nous assure la **continuité de service** et la *scalabilité* sans nous faire courir les risques et les coûts induits par une approche *IaaS*.

Il sera toujours possible d'opter, après une première phase d'observation de la production, pour une architecture technique plus *à façon* et éventuellement basée sur l'offre *IaaS*, mais bien plus probablement basée sur une intégration horizontale de micro-services *PaaS* alternatifs ou complémentaires à *EMR*.

## 4.3 Choix de la solution de stockage des données : Amazon S3

Nous pourrions être tentés de stocker nos données d'application sur l'espace de nos serveurs **EC2** sous-jacents. En effet, un principe fondamental des approches Big Data est d'assurer autant que possible la *collocalisation* des données et des traitements qui s'y rapportent.

Mais cela pose trois problèmes que nous préférons éviter :
1. Ce mode de stockage des données est **plus onéreux** que l'utilisation de services de stockage spécialisés.
2. En cas de résiliation d'une instance EMR, pour cause d'inactivité (un EMR coûte, même s'il n'est pas utilisé), ou de simple bascule d'une instance sur une autre, à défaut d'une solution supplémentaire de *backup*, les données seraient perdues.
3. Nous nous exposons à des **risques de ralentissements voire dysfonctionnements de service** en raison du risque de saturation de l'espace disponible, limité. 

**saturer** l'espace disponible de nos serveurs (ralentissements, dysfonctionnements).

Amazon propose un service de stockage des données, [**S3** (*Simple Storage Service*)](https://aws.amazon.com/fr/s3/), qui nous permet de nous affranchir de ces 3 problématiques :
* Coût de stockage compétitif.
* Persistence découplée du cycle de vie des instances EMR.
* Espace **illimité** (certes c'est une vue de l'esprit, mais disons que le risque de saturation est proche de 0).

En outre, en faisant le choix d'un service de stockage AWS, et en prenant notamment soin de choisir la même région (c'est-à-dire le même Data Center ou bien des Data Centers géographiquement proches) pour nos serveurs **EC2** et **S3**, nous nous assurons de minimiser la latence dans l'accès aux données depuis l'EMR.

<mark>?? De plus, comme nous le verrons <u>il est possible d'accéder aux données sur **S3** de la même manière que l'on **accède aux données sur un disque local**</u> -> ?? accès séquentiel, non ?</mark>

L'ensemble des données sera stocké dans le compartiment sécurisé (un *bucket* non accessible de l'Internet) **`s3://pepper-labs-fruits`**.

## 4.4 Configuration de l'environnement de travail

La première étape est de créer un utilisateur distinct du compte racine (Root), pour des raisons évidentes de sécurité.

Cet utilisateur doit disposer de droits suffisants pour opérer comme développeur et comme administrateur des services visés, donc disposer d'un contrôle total sur les services EMR et S3.

La création des utilisateurs réguliers, des groupes (par exemple *Developers*), la définition et l'attribution des droits, s'effectue depuis un service incontournable d'AWS, [**IAM** (Identity and Access Management)](https://aws.amazon.com/fr/iam/).

Nous choisissons, pour l'utilisateur *Root*, comme pour l'utilisateur régulier, *Pepper*, la mise en place d'une authentification [**MFA** (Multi-Factor Authentication)](https://aws.amazon.com/fr/iam/features/mfa/) comme barrière à l'entrée de la console AWS, ce qui nous assure le meilleur niveau de sécurité. En effet, dans le cadre d'une architecture cloud, le détournement d'un compte, en particulier un compte qui dispose des droits les plus étendus, pourrait avoir des conséquences dramatiques.

Pour accéder ensuite à distance, en ligne de commande, ou encore programmatiquement via les APIs d'AWS (boto3) aux services AWS sans passer par l'interface Web de la console AWS, il est nécessaire de générer des paires de clés publique/privée (RSA), chaque utilisateur devant prendre le plus grand soin à conserver sa clé privée secrète et donc à ne la communiquer sous aucun prétexte à qui que ce soit (sûreté).

Cette clé privée sera le sésame pour ouvrir les différentes serrures (commande AWS, accès SSH si nécessaires (par exemple à nos instances EC2), déchiffrement de mot de passe administrateur (par exemple si nous installions un serveur Windows), interactions programmatiques via l'API, etc). Notons qu'il est possible de multiplier ces clés, et même de les attribuer pour un usage unique, afin de renforcer la sécurité, mais nous laissons à l'équipe des Administrateurs Système le soin d'aligner la configuration d'IAM avec l'annuaire et les politiques de l'entreprise.

Pour ce qui les manipulations que nous allons effectuer dans la suite, il sera pratique d'utiliser [**AWS CLI**](https://aws.amazon.com/fr/cli/), l'interface en ligne de commande d'AWS, qui permet d'interagir directement depuis un terminal (Linux ou PowerShell) avec les différents services d'AWS**. Cela nous permettra par exemple de lister le contenu de notre compartiment S3, d'en télécharger ou d'y téléverser des fichiers, avec la même facilité que si ces fichiers étaient locaux.

## 4.5 Upload de nos données sur S3

Nos outils sont configurés.

Il faut maintenant de téléverser nos données de travail sur Amazon S3.

Ces données sont les images contenues dans le répertoire `Test` du jeu de données téléchargé sur [**Kaggle**](https://www.kaggle.com/moltean/fruits/download).

Créons le compartiment S3 :

```sh
$ aws s3 mb s3://pepper-labs-fruits
```

Vérifions qu'il a bien été créé :

```sh
$ aws s3 ls
2023-05-15 20:31:10 pepper-labs-fruits
```

Copions à présent le contenu du dossier `Test` dans un répertoire `Test` sur notre bucket `p8-data` :

On se place à l'intérieur du répertoire `Test` et on synchronise les contenus local et distant à l'aide de la commande `sync` :

```sh
$ cd <fruits-360_dataset_path>/Test
$ aws sync . s3://pepper-labs-fruits/Test
```

Nos données de projet sont à présent disponibles sur Amazon S3.

## 4.6 Configuration du serveur EMR

Une fois encore, le cours [Réalisez des calculs distribués sur des données massives / Déployez un cluster de calculs distribués](https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives/4308696-deployez-un-cluster-de-calculs-distribues) <br /> détaille l'essentiel des étapes pour lancer un cluster avec **EMR**.

Réécrire les références 1/ en plus dense 2/ en plus 'à la source'

----

Je détaillerai ici les étapes particulières qui nous permettent de configurer le serveur selon nos besoins :
1. Cliquez sur `Créer un cluster` :
![Créer un cluster](../baseline/img/EMR_creer.png)

2. Cliquez sur `Accéder aux options avancées` :
![Créer un cluster](../baseline/img/EMR_options_avancees.png)

### 4.6.1 Étape 1 : Logiciels et étapes

#### 4.6.1.1 Configuration des logiciels

Sélectionnez les packages dont nous aurons besoin comme dans la capture d'écran :
1. Nous sélectionnons la dernière version d'`EMR`, soit la version `6.3.0` au moment où je rédige ce document
2. Nous cochons bien évidement `Hadoop` et `Spark` qui seront préinstallés dans leur version la plus récente
3. Nous aurons également besoin de `TensorFlow` pour importer notre modèle et réaliser le *transfert learning*
4. Nous travaillerons enfin avec un *notebook `Jupyter`* via l'application `JupyterHub`
  * Comme nous le verrons dans un instant nous allons <u>paramétrer l'application afin que les notebooks</u>,
  * comme le reste de nos données de travail, <u>soient enregistrés directement sur S3</u>.
  
![Créer un cluster](../baseline/img/EMR_configuration_logiciels.png)

#### 4.6.1.2 Modifier les paramètres du logiciel

Paramétrez la persistance des notebooks créés et ouvert via JupyterHub :

On peut à cette étape effectuer des demandes de paramétrage particulières sur nos applications.

L'objectif est, comme pour le reste de nos données de travail, d'éviter toutes les problématiques évoquées précédemment.

C'est l'objectif à cette étape, <u>nous allons enregistrer et ouvrir les notebooks</u> non pas sur l'espace disque de  l'instance EC2 (comme ce serait le cas dans la configuration par défaut de JupyterHub) mais <u>directement sur **Amazon S3**</u>.

Deux solutions sont possibles pour réaliser cela :
1. Créer un **fichier de configuration JSON** que l'on **upload sur S3** et on indique ensuite le chemin d’accès au fichier JSON
2. Rentrez directement la configuration au format JSON
 
J'ai personnellement créé un fichier JSON lors de la création de ma première instance EMR, puis lorsqu'on décide de cloner notre serveur pour en recréer un facilement à l'identique, la configuration du fichier JSON se retrouve directement copié comme dans la capture ci-dessous.

<u>Voici le contenu de mon fichier JSON</u> :

```sh
[
    {
        "classification": "jupyter-s3-conf",
        "properties": {
            "s3.persistence.bucket": "pepper-labs-fruits",
            "s3.persistence.enabled": "true"
        }
    }
]
```

Appuyez ensuite sur "**Suivant**"

![Modifier les paramètres du logiciel](../baseline/img/EMR_parametres_logiciel.png)

### 4.6.2 Étape 2 : Matériel

A cette étape, laissez les choix par défaut.

L'important ici est la sélection des instances :

1. Nous choisissons des instances de type `M5` qui sont des **instances de type équilibrés**
2. Nous choisissons le type `xlarge` qui est l'instance la **moins onéreuse disponible** 
3. Nous sélectionnons 1 instance **Maître** (le *pilote*) et 2 instances **Principales** (les *travailleurs*) soit 3 instances EC2.

Références :
* [Instances M5 Amazon EC2](https://aws.amazon.com/fr/ec2/instance-types/m5/)

![Choix du materiel](../baseline/img/EMR_materiel.png)

### 4.6.3 Étape 3 : Paramètres de cluster généraux

#### 4.6.3.1 Options générales

La première chose à faire est de donner un nom au cluster.

Pour des raisons pratiques, j'ai également décoché `Protection de la résiliation`.
    
![Nom du Cluster](../baseline/img/EMR_nom_cluster.png)

#### 4.6.3.2 Actions d'amorçage

Nous allons à cette étape **choisir les packages manquants à installer** qui sont indispensables pour exécuter notre notebook.

L'avantage de réaliser cette étape maintenant est que les packages installés le seront sur l'ensemble des machines du cluster.

La procédure pour créer le fichier **bootstrap** qui contient l'ensemble des instructions permettant d'installer tous les packages dont nous aurons besoin est expliqué dans le cours [Réalisez des calculs distribués sur des données massives / Bootstrapping](https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives/4308696-deployez-un-cluster-de-calculs-distribues#/id/r-4356490)

Nous créons donc un fichier nommé `bootstrap-emr.sh` que nous téléversons sur S3 (à la racine du compartiment `pepper-bucket`) et nous l'ajoutons comme indiqué dans la capture d'écran ci-dessous:

```sh
$ aws s3 cp .\bootstrap-emr.sh s3://pepper-bucket
upload: .\bootstrap-emr.sh to s3://pepper-bucket/bootstrap-emr.sh
```

![Actions d’amorçage](../baseline/img/EMR_amorcage.png)

Voici le contenu du fichier `bootstrap-emr.sh`

Il s'agit d'une séquence de commandes `pip install` pour installer les bibliothèques manquantes comme réalisé en local.

Il est nécessaire de réaliser ces actions à cette étape pour que les packages soient installés sur l'ensemble des machines du cluster et non pas uniquement sur le driver, comme cela serait le cas si nous exécutions ces commandes directement dans le notebook JupyterHub ou dans la console EMR (connecté au driver).

Ces actions d'amorçage sont exécutées avant l'installation des applications et avant qu'Amazon EMR ne commence à traiter les données. En cas d'ajout de nouveaux nœuds à un cluster en cours d'exécution, ces actions d'amorçage seront également exécutées sur ces nouveaux nœuds.

En revanche, l'action d'amorçage n'est effectuée qu'une fois sur chaque nœud. Pour modifier la configuration d'amorçage, il faut donc résilier l'instance EMR et lancer un nouveau cluster. 

```sh
#!bin/bash
sudo python3 -m pip install -U setuptools
sudo python3 -m pip install -U pip
sudo python3 -m pip install wheel
sudo python3 -m pip install pillow
sudo python3 -m pip install pandas
sudo python3 -m pip install pyarrow
sudo python3 -m pip install boto3
sudo python3 -m pip install s3fs
sudo python3 -m pip install fsspec
```


* **`setuptools`** et **`pip`** doivent être mis à jour pour éviter un problème avec l'installation de **`pyarrow`**.
* **`Pandas`** a eu droit à une mise à jour majeure (`1.3.0`) il y a moins d'une semaine au moment de la rédaction de ce notebook, et cette nouvelle version de **`Pandas`** dépend d'une version plus récente de **`Numpy`** que la version installée par défaut (`1.16.5`) lors de l'initialisation des instances **EC2**. <u>Il ne semble pas possible d'imposer une autre version de Numpy que celle installé par défaut</u> même si on force l'installation d'une version récente de **Numpy** (en tout cas, ni simplement ni intuitivement).

La mise à jour étant très récente <u>la version de **Numpy** n'est pas encore mise à jour sur **EC2**</u> mais on peut imaginer que ce sera le cas très rapidement et il ne sera plus nécessaire d'imposer une version spécifique de **Pandas**.

En attendant, je demande <u>l'installation de l'avant dernière version de **Pandas (1.2.5)**</u>

On clique ensuite sur `Suivant`

### 4.6.4 Étape 4 : Sécurité

#### 4.6.4.1 Options de sécurité

A cette étape nous sélectionnons la **paire de clés EC2** créée précédemment.

Elle nous permettra de nous connecter en `ssh` à nos **instances EC2** sans avoir à saisir nos login et mot de passe.

Laissons les autres paramètres à leur valeur par défaut.

Cliquons enfin sur `Créer un cluster`.
 
![EMR Sécurité](../baseline/img/EMR_securite.png)

## 4.7 Instanciation du serveur

Il ne nous reste plus qu'à attendre que le serveur soit prêt.

Cette étape peut prendre entre **15 et 20 minutes**.

Plusieurs étapes s'enchaînent et on peut suivre l'évolution du statut du **cluster EMR** :

![Instanciation étape 1](../baseline/img/EMR_instanciation_01.png)
![Instanciation étape 2](../baseline/img/EMR_instanciation_02.png)
![Instanciation étape 3](../baseline/img/EMR_instanciation_03.png)

Lorsque le statut affiche en vert `"En attente"` cela signifie que l'instanciation s'est bien déroulée et que notre serveur est prêt à être utilisé. 

## 4.8 Création du tunnel SSH à l'instance EC2 (Maître)

### 4.8.1 Création des autorisations sur les connexions entrantes

Nous souhaitons maintenant pouvoir accéder aux applications :
* **JupyterHub**, pour l'exécution de notre notebook ;
* **Serveur d'historique Spark**, pour le suivi de l'exécution des tâches de notre script lorsqu'il sera lancé.
 
Ces applications ne sont accessibles que depuis le réseau local du pilote.

Pour y accéder nous devons donc **créer un tunnel SSH vers le pilote**.

Par défaut, ce driver se situe derrière un pare-feu qui bloque l'accès SSH.

Pour ouvrir le port `22` sur lequel écoute le serveur SSH, il faut modifier le **groupe de sécurité EC2 du driver**.

Cette étape est décrite dans le cours [Réalisez des calculs distribués sur des données massives / Lancement d'une application à partir du driver](https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives/4308696-deployez-un-cluster-de-calculs-distribues#/id/r-4356512): 

Sur la page de la console consacrée à EC2, dans l'onglet "Réseau et sécurité", cliquez sur "Groupes de sécurité".
Vous allez devoir modifier le groupe de sécurité d’ElasticMapReduce-Master. 

Dans l'onglet "Entrant", ajoutez une règle SSH dont la source est "N'importe où" (ou "Mon IP" si vous disposez d'une adresse IP fixe).

![Configuration autorisation ports entrants pour ssh](../baseline/img/EMR_config_ssh_01.png)

Une fois cette étape réalisée vous devriez avoir une configuration semblable à la mienne :

![Configuration ssh terminée](../baseline/img/EMR_config_ssh_02.png)

### 4.8.2 Création du tunnel ssh vers le Driver

On peut maintenant établir le **tunnel SSH** vers le **Pilote**.

Pour cela on récupère les informations de connexion fournis par Amazon depuis la page du service EMR / Cluster / onglet Récapitulatif en cliquant sur "**Activer la connexion Web**".

![Activer la connexion Web](../baseline/img/EMR_tunnel_ssh_01.png)

<u>On récupère ensuite la commande fournie par Amazon pour **établir le tunnel SSH**</u> :

![Récupérer la commande pour établir le tunnel ssh](../baseline/img/EMR_tunnel_ssh_02.png)

<u>Dans mon cas, la commande ne fonctionne pas telle</u> quelle et j'ai du **l'adapter à ma configuration**. <br />

La **clé ssh** se situe dans un dossier `.ssh` elle-même située dans mon **répertoire personnel** dont le symbole est, sous Linux, identifié par un tilde `~`.

Ayant suivi le cours [Réalisez des calculs distribués sur des données massives / Lancement d'une application à partir du driver](https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives)

* j'ai choisi d'utiliser le port `5555` au lieu du `8157`, même si le choix n'est pas très important.
* j'ai également rencontré un <u>problème de compatibilité</u> avec l'argument `-N` (la liste des arguments et leur significations disponibles [ici](https://explainshell.com/explain?cmd=ssh+-L+-N+-f+-l+-D)) et j'ai décidé de simplement le supprimer.

<u>Finalement, j'utilise la commande suivante dans un terminal pour établir mon tunnel SSH (seule l'URL change d'une instance à une autre)</u> :

```sh
$ ssh -i ~/.ssh/p8-ec2.pem -D 5555 hadoop@ec2-35-180-91-39.eu-west-3.compute.amazonaws.com
```

<u>On inscrit `yes` pour valider la connexion et si la connexion est établie on obtient le résultat suivant</u> :

![Création du tunnel SSH](../baseline/img/EMR_connexion_ssh_01.png)

Nous avons **correctement établi le tunnel ssh avec le driver** sur le port `5555`.

### 4.8.3 Configuration de FoxyProxy

Une dernière étape est nécessaire pour accéder à nos applications, en demandant à notre navigateur d'emprunter le tunnel ssh.

J'utilise pour cela **FoxyProxy**.

[Une fois encore, vous pouvez utiliser le cours pour le configurer](https://openclassrooms.com/fr/courses/4297166-realisez-des-calculs-distribues-sur-des-donnees-massives/4308701-realisez-la-maintenance-dun-cluster#/id/r-4356554).

Sinon, ouvrez la configuration de **FoxyProxy** et <u>cliquez sur **Ajouter**</u> en haut à gauche puis renseigner les éléments comme dans la capture ci-dessous :

![Configuration FoxyProxy Etape 1](../baseline/img/EMR_foxyproxy_config_01.png)

<u>On obtient le résultat ci-dessous</u> :

![Configuration FoxyProxy Etape 2](../baseline/img/EMR_foxyproxy_config_02.png)

### 4.8.4 Accès aux applications du serveur EMR via le tunnel ssh

<u>Avant d'établir notre **tunnel ssh** nous avions ça</u> :

![avant tunnel ssh](../baseline/img/EMR_tunnel_ssh_avant.png)

<u>On active le **tunnel ssh** comme vu précédemment puis on demande à notre navigateur de l'utiliser avec **FoxyProxy**</u> :

![FoxyProxy activation](../baseline/img/EMR_foxyproxy_activation.png)

<u>On peut maintenant s'apercevoir que plusieurs applications nous sont accessibles</u> :

![avant tunnel ssh](../baseline/img/EMR_tunnel_ssh_apres.png)

## 4.9 Connexion au notebook JupyterHub

Pour se connecter à **JupyterHub** en vue d'exécuter notre **notebook**, il faut commencer par <u>cliquer sur l'application **JupyterHub**</u> apparue depuis que nous avons configuré le **tunnel ssh** et **foxyproxy** sur notre navigateur (actualisez la page si ce n’est pas le cas).

![Démarrage de JupyterHub](../baseline/img/EMR_jupyterhub_connexion_01.png)

On passe les éventuels avertissements de sécurité puis nous arrivons sur une page de connexion.

<u>On se connecte avec les informations par défaut</u> :
 - <u>login</u>: **jovyan**
 - <u>password</u>: **jupyter**
 
![Connexion à JupyterHub](../baseline/img/EMR_jupyterhub_connexion_02.png)

Nous arrivons ensuite dans un dossier vierge de notebook.

Il suffit d'en créer un en cliquant sur "**New**" en haut à droite.

![Liste et création des notebook](../baseline/img/EMR_jupyterhub_creer_notebooks.png)

Il est également possible d'en <u>uploader un directement dans notre **bucket S3**</u>.

Grace à la <u>**persistance** paramétrée à l'instanciation du cluster nous sommes actuellement dans l'arborescence de notre **bucket S3**</u>

![Notebook stockés sur S3](../baseline/img/EMR_jupyterhub_S3.png)

Je décide d'**importer un notebook déjà rédigé en local directement sur S3** et je l'ouvre depuis **l'interface JupyterHub**.

## 4.10 Exécution du code

Je décide d'exécuter cette partie du code depuis **JupyterHub hébergé sur notre cluster EMR**.

Pour ne pas alourdir inutilement les explications du **notebook**, je ne réexpliquerai pas les étapes communes que nous avons déjà vues dans la première partie où l'on a exécuté le code localement sur notre machine virtuelle Ubuntu.

<u>Avant de commencer</u>, il faut s'assurer d'utiliser le **kernel pyspark**.

**En utilisant ce kernel, une session spark est créée à l'exécution de la première cellule**.

Il n'est donc **plus nécessaire d'exécuter le code `spark = (SparkSession ...`** comme lors de l'exécution de notre notebook en local sur notre VM Ubuntu.

### 4.10.1 Démarrage de la session Spark

In [None]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1626050279029_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [None]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1626050279029_0001,pyspark,idle,Link,Link,✔


### 4.10.2 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

### 4.10.3 Import des librairies

In [None]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.10.4 Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

#### Version d'origine de l'alternant

```python
>>> PATH = 's3://p8-data'
>>> PATH_Data = PATH+'/Test'
>>> PATH_Result = PATH+'/Results'
>>> print('PATH:        '+\
>>>       PATH+'\nPATH_Data:   '+\
>>>       PATH_Data+'\nPATH_Result: '+PATH_Result)
PATH:        s3://p8-data
PATH_Data:   s3://p8-data/Test
PATH_Result: s3://p8-data/Results
```

In [2]:
bucket_path = "s3://p8-data"
input_path = f"{bucket_path}/Test"
output_path = f"{bucket_path}/Results"
print("bucket_path:", bucket_path)
print(" input_path:", input_path)
print("output_path:", output_path)

bucket_path: s3://p8-data
 input_path: s3://p8-data/Test
output_path: s3://p8-data/Results


### 4.10.5 Traitement des données

#### 4.10.5.1 Chargement des données

In [None]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p8-data/Test...|2021-07-03 09:00:08|  7353|[FF D8 FF E0 00 1...|
|s3://p8-data/Test...|2021-07-03 09:00:08|  7350|[FF D8 FF E0 00 1...|
|s3://p8-data/Test...|2021-07-03 09:00:08|  7349|[FF D8 FF E0 00 1...|
|s3://p8-data/Test...|2021-07-03 09:00:08|  7348|[FF D8 FF E0 00 1...|
|s3://p8-data/Test...|2021-07-03 09:00:09|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [None]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+------------------------------------------+----------+
|path                                      |label     |
+------------------------------------------+----------+
|s3://p8-data/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://p8-data/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://p8-data/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://p8-data/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://p8-data/Test/Watermelon/r_95_100.jpg |Watermelon|
+------------------------------------------+----------+
only showing top 5 rows

None

#### 4.10.5.2 Préparation du modèle

In [None]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

    8192/14536120 [..............................] - ETA: 0s

In [None]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
broadcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(broadcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### 4.10.5.3 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



#### 4.10.5.4 Exécutions des actions d'extractions de features

In [None]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://p8-data/Results

In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.10.6 Chargement des données enregistrées et validation du résultat

In [None]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                           path  ...                                           features
0    s3://p8-data/Test/Watermelon/r_174_100.jpg  ...  [0.0059991637, 0.44703647, 0.0, 0.0, 3.3713572...
1  s3://p8-data/Test/Pineapple Mini/128_100.jpg  ...  [0.0146466885, 4.080593, 0.055877004, 0.0, 0.0...
2  s3://p8-data/Test/Pineapple Mini/137_100.jpg  ...  [0.0, 4.9659867, 0.0, 0.0, 0.0, 0.0, 0.5144821...
3      s3://p8-data/Test/Watermelon/275_100.jpg  ...  [0.22511952, 0.07235509, 0.0, 0.0, 1.690149, 0...
4      s3://p8-data/Test/Watermelon/271_100.jpg  ...  [0.3286234, 0.18830013, 0.0, 0.0, 1.9123534, 0...

[5 rows x 3 columns]

In [None]:
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [None]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(22688, 3)

On peut également constater la présence des fichiers au format "**parquet**" sur le **serveur S3** :

![Affichage des résultats sur S3](../baseline/img/S3_Results.png)

## 4.11 Suivi de l'avancement des tâches avec le Serveur d'Historique Spark

Il est possible de voir l'avancement des tâches en cours avec le **serveur d'historique Spark**.

![Accès au serveur d'historique spark](../baseline/img/EMR_serveur_historique_spark_acces.png)

**Il est également possible de revenir et d'étudier les tâches qui ont été réalisé, afin de debugger, optimiser les futurs tâches à réaliser.**

Lorsque la commande `features_df.write.mode("overwrite").parquet(PATH_Result)` était en cours, nous pouvions observer son état d'avancement :

![Progression execution script](../baseline/img/EMR_jupyterhub_avancement.png)

Le **serveur d'historique Spark** nous permet une vision beaucoup plus précise de l'exécution des différentes tâche sur les différentes machines du cluster :

![Suivi des tâches spark](../baseline/img/EMR_SHSpark_01.png)

On peut également constater que notre cluster de calcul a mis un tout petit peu **moins de 8 minutes** pour traiter les **22 688 images**.

![Temps de traitement](../baseline/img/EMR_SHSpark_02.png)


## 4.12 Résiliation de l'instance EMR

Notre travail est maintenant terminé.

Le cluster de machines EMR est **facturé à la demande**, et nous continuons d'être facturé même lorsque les machines sont au repos.

Pour **optimiser la facturation**, il nous faut maintenant **résilier le cluster**.

Je réalise cette commande depuis l'interface AWS :

1. Commencez par **désactiver le tunnel ssh dans FoxyProxy** pour éviter des problèmes de **timeout**.
![Désactivation de FoxyProxy](../baseline/img/EMR_foxyproxy_desactivation.png)
2. Cliquez sur "**Résilier**"
![Cliquez sur Résilier](../baseline/img/EMR_resiliation_01.png)
3. Confirmez la résiliation
![Confirmez la résiliation](../baseline/img/EMR_resiliation_02.png)
4. La résiliation prend environ **1 minute**
![Résiliation en cours](../baseline/img/EMR_resiliation_03.png)
5. La résiliation est effectuée
![Résiliation terminée](../baseline/img/EMR_resiliation_04.png)



## 4.13 Cloner le serveur EMR (si besoin)

Si nous devons de nouveau exécuter notre notebook dans les mêmes conditions, il nous suffit de **cloner notre cluster** et ainsi en obtenir une copie fonctionnelle sous 15/20 minutes, le temps de son instanciation.

<u>Pour cela deux solutions</u> :
1. <u>Depuis l'interface AWS</u> :
    1. Cliquez sur "**Cloner**"
    ![Cloner un cluster](../baseline/img/EMR_cloner_01.png)
    2. Dans notre cas nous ne souhaitons pas inclure d'étapes
    ![Ne pas inclure d'étapes](../baseline/img/EMR_cloner_02.png)
    3. La configuration du cluster est recréée à l’identique.
        * On peut revenir sur les différentes étapes si on souhaite apporter des modifications
        * Quand tout est prêt, cliquez sur "**Créer un cluster**"
    ![Vérification/Modification/Créer un cluster](../baseline/img/EMR_cloner_03.png)
2. <u>En ligne de commande</u> (avec AWS CLI d'installé et de configuré et en s'assurant de s'attribuer les droits nécessaires sur le compte AMI utilisé)
    1. Cliquez sur "**Exporter AWS CLI**"
    ![Exporter AWS CLI](../baseline/img/EMR_cloner_cli_01.png)
    2. Copier/Coller la commande **depuis un terminal**
    ![Copier Coller Commande](../baseline/img/EMR_cloner_cli_02.png)

## 4.14 Arborescence du serveur S3 à la fin du projet

Pour information, voici **l'arborescence complète de mon bucket S3 p8-data** à la fin du projet :

*Par soucis de lisibilité, nous ne listons pas les 131 sous dossiers du répertoire "Test"*

```sh
1. Results/_SUCCESS
1. Results/part-00000-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00001-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00002-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00003-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00004-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00005-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00006-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00007-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00008-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00009-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00010-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00011-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00012-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00013-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00014-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00015-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00016-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00017-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00018-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00019-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00020-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00021-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00022-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Results/part-00023-2cc36f38-19ef-4d8a-a0d1-5ddb309b3894-c000.snappy.parquet
1. Test/
1. bootstrap-emr.sh
1. jupyter-s3-conf.json
1. jupyter/jovyan/.s3keep
1. jupyter/jovyan/P8_01_Notebook.ipynb
1. jupyter/jovyan/_metadata
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/file-perm.sqlite
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/templates/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/templates/html/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbconvert/templates/latex/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/nbsignatures.db
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.aws-editors-workspace-metadata/notebook_secret
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/Untitled-checkpoint.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/Untitled1-checkpoint.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/.ipynb_checkpoints/test3-checkpoint.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/Untitled.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/Untitled1.ipynb
1. jupyter/jovyan/e-5OTY4VKPDT21945FF6DN15E35/test3.ipynb
```

# 5\. Conclusion

Nous avons réalisé ce projet **en deux temps** en tenant compte des contraintes qui nous ont été imposées.

Nous avons **dans un premier temps développé notre solution en local** sur une machine virtuelle dans un environnement Linux Ubuntu.

La <u>première phase</u> a consisté à **installer l'environnement de travail Spark**.

**Spark** a un paramètre qui nous permet de travailler en local et nous permet ainsi de **simuler du calcul partagé** en considérant **chaque cœur d'un processeur comme un worker indépendant**.

Nous avons travaillé sur un plus **petit jeu de donnée**, l'idée était simplement de **valider le bon fonctionnement de la solution**.

Nous avons fait le choix de réaliser du **transfert learning** à partir du model **MobileNetV2**.

Ce modèle a été retenu pour sa **légèreté** et sa **rapidité d'exécution** ainsi que pour la **faible dimension de son vecteur en sortie**.

Les résultats ont été enregistrés sur disque en plusieurs partitions au format "**parquet**".

**La solution a parfaitement fonctionné en mode local**.

La <u>deuxième phase</u> a consisté à créer un **réel cluster de calculs**.

L'objectif était de pouvoir **anticiper une future augmentation de la charge de travail**.

Le meilleur choix retenu a été l'utilisation du prestataire de services **Amazon Web Services** qui nous permet de **louer à la demande de la puissance de calculs**, pour un **coût tout à fait acceptable**.

Ce service se nomme **EC2** et se classe parmi les offres **Infrastructure as a Service** (IaaS).

Nous sommes allez plus loin en utilisant un service de plus haut niveau (**Plateforme as a Service** PaaS) en utilisant le service **`EMR`** qui nous permet d'un seul coup d'**instancier plusieurs serveurs (un cluster)** sur lesquels nous avons pu demander l'installation et la configuration de plusieurs programmes et librairies nécessaires à notre projet comme **`Spark`**, **`Hadoop`**, **`JupyterHub`** ainsi que la librairie **`TensorFlow`**.

En plus d'être plus **rapide et efficace à mettre en place**, nous avons la **certitude du bon fonctionnement de la solution**, celle-ci ayant été préalablement validé par les ingénieurs d'Amazon.

Nous avons également pu installer, sans difficulté, **les packages nécessaires sur l'ensembles des machines du cluster**.

Enfin, avec très peu de modification, et plus simplement encore, nous avons pu **exécuter notre notebook comme nous l'avions fait localement**.

Nous avons cette fois-ci exécuté le traitement sur **l'ensemble des images de notre dossier "Test"**.

Nous avons opté pour le service **Amazon S3** pour **stocker les données de notre projet**.

S3 offre, pour un faible coût, toutes les conditions dont nous avons besoin pour stocker et exploiter de manière efficace nos données.

L'espace alloué est potentiellement **illimité**, mais les coûts seront fonction de l'espace utilisé.

Il nous sera **facile de faire face à une montée de la charge de travail** en **redimensionnant** simplement notre cluster de machines (horizontalement et/ou verticalement au besoin), les coûts augmenteront en conséquence mais resteront nettement inférieurs aux coûts engendrés par l'achat de matériels ou par la location de serveurs dédiés.