# Projet 8: Déployez un modèle dans le cloud

Fruits est une start-up de l'agri-tech qui a pour volonté de préserver la **biodiversité des fruits** en développant des **robots cueilleurs intelligents** qui appliqueraient des **traitements spécifiques à chaque espèce** de fruits lors de la récolte.

Pour se faire connaître auprès du grand public, elle souhaite mettre à sa disposition une **application mobile** qui permettrait aux utilisateurs de **prendre en photo** un fruit et d'obtenir des **informations** sur ce dernier. Trois principaux objectifs:
- sensibiliser le grand public à la biodiversité des fruits
- mettre en place une première version du moteur de classification des images de fruits
- construire une première version de l'architecture Big Data nécessaire

**Contraintes à prendre en compte:**
- le **volume des données va augmenter très rapidement** après la livraison de ce projet => architecture Big Data avec scripts en Pyspark
- compléter le script d'un **traitement de diffusion des poids du modèle Tensorflow sur les clusters**
- compléter le script d'une étape de **réduction de dimension de type PCA** en PySpark
- les **serveurs** doivent être situés sur le **territoire européen**
- faire un **retour critique** sur le choix de l'architecture Big Data choisie

Ce notebook contient les **scripts en PySpark exécutables** sur **Databricks** c'est à dire les **étapes de preprocessing** et la **réduction de dimension** de type PCA (Principal Component Analysis).

## Introduction au Big Data

Le **web 3.0** est celui de la **mobilité**, des **objets connectés** et des **données**. C'est un web sémantique où l'internaute est "fiché", notamment au travers de sa navigation et de ses différents profils sur les réseaux sociaux. 

L'**utilisation massive** d'internet et des objets connectés comme le téléphone, l'ordinateur, la montre, la balance etc engendre la **production de quantités astronomiques de données** ce qui pose des **problématiques de stockage et de traitement approprié** pour les entreprises qui les exploitent afin de prendre notamment des décisions stratégiques et concurrentielles.

Cette infographie offre une vision très parlante du monde de la donnée aujourd'hui:

![Data](files/tables/img/data_infographie.PNG)

A titre d'exemple, en 2009, 800 000 pétaoctets de données étaient produits, 35 zettaoctets en 2020 et plus de 180 zettaoctets de données prévus d'ici 2025 (*5 en 5 ans!).

Il faut donc des **technologies innovantes** capables de traiter:
- des volumes de données énormes et en constante augmentation
- des données provenant de sources multiples et de natures diverses
- des besoins analytiques vitaux à fournir dans des délais impartis

Le Big Data désigne le courant technologique que nous voyons émerger ces dernières années autour des données, des **mégadonnées que nous permettent de stocker aujourd’hui les serveurs**. Le Big Data vient du fait que les données de certaines entreprises ou institutions sont devenues tellement volumineuses que les outils techniques classiques de gestion, de requête sur les bases dites structurées et de traitement des données sont devenus obsolètes, avec des difficultés dans l’instanciation de celles-ci, les temps d’extraction, de traitement devenant trop long.

Le socle commun sur lequel à peu près tout le monde s’entend pour caractériser les problématiques de Big Data, ce sont les 4V : Volume, Vitesse, Variété et Véracité.
- **Volume** : des volumes de données énormes en constante augmentation
- **Vitesse** : des besoins analytiques importants à fournir dans des délais impartis
- **Varieté** : des données provenant de sources multiples et de natures diverses
- **Véracité** : fait référence à la fiabilité de la donnée, la qualité et la précision sont moins vérifiables

Le choix de l'**architecture de données** à mettre en place est donc essentiel.

## Choix d'utiliser Databricks

Plusieurs raisons m'ont poussé à transférer les traitements sur Databricks qui est un environnement SaaS (Software-as-a-Service) permettant d'accéder aux données et aux ressources de calcul:

![Databricks](files/tables/img/databricks_logo.PNG)


- **simplicité**: Databricks permet une seule architecture de données unifiée sur S3 (solution de stockage des données sur Amazon) pour l'analytique SQL, la data science et le machine learning
- **rapport performance / prix**: performances du data warehouse au prix d'un data lake grâce à des clusters de calcul optimisés par SQL
- **réputation**: des clients prestigieux comme HP, Nasdac, Hotels.com ont mis en œuvre Databricks sur AWS pour mettre à disposition une plateforme d'analytique révolutionnaire répondant à tous les cas d'usage de l'analytique et de l'IA.

Construire des modèles de ML est difficile et les mettre en production est encore plus difficile:
- la **diversité des frameworks de ML** complique la gestion des environnements de ML
- les **transferts entre équipes sont difficile**s en raison de la disparité des outils et des processus, de la préparation des données à l'expérimentation et à la production
- la difficulté de suivre les expériences, les modèles, les dépendances et les artefacts rend **difficile la reproduction des résultats**
- il y a des **risques liés à la sécurité et à la conformité**

Le maintien de la qualité des données et de la précision des modèles au fil du temps ne sont que quelques-uns des défis à relever. Databricks va **rationaliser le développement ML**, de la préparation des données à l'entraînement et au déploiement des modèles, à **grande échelle** tout en permettant la collaboration:
- **workspace**: un lieu central pour stocker et partager des notebooks, des expériences et des projets, avec un contrôle d'accès basé sur les rôles
- **notebooks** collaboratifs: prise en charge de plusieurs langues (R, Python, SQL, Scala) et le versionning intégré permettent aux équipes de partager et d'itérer sur le code plus rapidement
- **AutoML**: obtention de résultats plus rapides grâce à l'ajustement automatique des hyperparamètres et à la recherche de modèles avec les intégrations Hyperopt, Apache SparkTM et MLflow
- **Experiments Tracking**: suivre automatiquement les expériences et utiliser les visualisations intégrées pour voir et comparer les résultats de milliers d'essais, sélectionner les paramètres, les mesures et identifier le meilleur essai.
- **Model Registry**: enregistrer le modèle à mettre en production dans un seul endroit et gérer son cycle de vie de manière collaborative

## Monter un bucket AWS S3 sur Databricks

**L'analyse des données directement dans un entrepôt de données peut s'avérer coûteuse**, c'est pourquoi les entreprises recherchent d'autres plateformes capables de stocker et de traiter tout ou partie de leurs données. 

L'un de ces outils est **Amazon S3** qui est un service en ligne qui offre un **stockage flexible aux entreprises**. Son contrôle d'accès granulaire, son chargement de métadonnées et d'autres caractéristiques de ce type en font le premier choix de tous les analystes de données. Aujourd'hui, les entreprises **transfèrent des informations de Databricks vers S3 afin d'utiliser un espace de stockage évolutif à un prix inférieur**.

Pour monter un bucket AWS S3 sur Databricks, se référer au notebook suivant:
- Mount_AWS_S3_Bucket.ipynb

## Import des librairies

In [0]:
# Pyspark
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, countDistinct

# Python
from PIL import Image
import time
import pandas as pd
import numpy as np
import io

# Tensorflow
import tensorflow as tf

# Preprocessing images pour VGG16
from keras.applications.vgg16 import VGG16, preprocess_input
from keras.models import Model
from tensorflow.keras.preprocessing.image import img_to_array, load_img

## Chargement des données

Dans cette partie, j'ai enregistré sur l'espace de stockage de Databricks quelques dossiers de fruits comportants chacun 5 images afin de tester le script sur un jeu de données réduit. J'avais uploadé des images sur Amazon S3 dans un premier temps mais cela revenait cher. Je profite de l'essai gratuit de Databricks pendant 14 jours pour l'élaboration du script avant de l'appliquer sur les données présentes sur S3.

In [0]:
# Dossiers présents dans FileStore
dbutils.fs.ls("/FileStore/tables/")

In [0]:
# Vérification du dossier où sont stockées les images
dbutils.fs.ls("/FileStore/tables/data/test_S3/")

In [0]:
# Chargement des images au format image pour affichage
df_img_avocado = spark.read.format("image").load("/FileStore/tables/data/test_S3/Avocado/")

# Affichage des images
display(df_img_avocado)

Les différentes vues du fruit sont présentes dans des dossiers qui portent le nom du fruit. Nous allons **charger les données avec une extension.jpg** puis nous **rajouterons une colonne "Label"** correspondant au nom du dossier dans lequel se situent les images. Databricks recommande d'utiliser la source de données **fichier binaire** pour charger les données d'image dans le DataFrame Spark sous forme d'octets bruts.\
https://docs.databricks.com/external-data/image.html

Databricks prend en charge les fichiers binaires et convertit chacun d’eux en un enregistrement unique qui contient le **contenu brut et les métadonnées du fichier**. La source de données de fichier binaire produit un DataFrame avec les colonnes suivantes et éventuellement des colonnes de partition :
- **path (StringType)** : Chemin d'accès au fichier.
- **modificationTime (TimestampType)** : Heure de dernière modification de l'événement. Dans certaines implémentations Hadoop FileSystem, ce paramètre peut ne pas être disponible et la valeur est définie sur une valeur par défaut.
- **length (LongType)** : Longueur du fichier en octets.
- **content (BinaryType)** : Contenu du fichier.

In [0]:
# Chemin d'accès des données
path_data = "/FileStore/tables/data/test_S3/"

# lecture des fichiers jpg de manière récursive à partir du répertoire 
# d'entrée en ignorant la détection de la partition
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(path_data)

In [0]:
display(images.limit(5))

In [0]:
# 5 premières lignes du dataframe issu de la source de données de fichier binaire
images.show(5)

Nous aurons besoin du **chemin d'accès** aux images (colonne **path**) ainsi que d'une colonne contenant les **labels** de chaque image.

In [0]:
# Création colonne label
df_images = images.withColumn('label', element_at(split(images['path'], '/'),-2)) 
print(df_images.printSchema())

# Affichage des 5 premières lignes
df_images.show(5)

## Quelques statistiques

In [0]:
# Nombre d'images par label
df_images.groupBy("label").agg(countDistinct('path')).show()

In [0]:
# Calcul du nombre de labels
df_images.select(countDistinct("label")).show()
#images.select('label').distinct().count()

In [0]:
# Calcul du nombre d'images
df_images.select(countDistinct("path")).show()

## Preprocessing des données

Une série de modèles très performants ont été développés pour la classification d'images et démontrés lors du concours annuel ImageNet.
Ces modèles peuvent être utilisés comme base pour l'apprentissage par transfert dans les applications de vision par ordinateur pour plusieurs raisons:
- Les modèles ont appris à détecter les **caractéristiques génériques des photographies**, étant donné qu'ils ont été **entraînés sur plus de 14 000 000 d'images pour 1 000 catégories**.
- **Performances de pointe** : Les modèles ont atteint une performance de pointe et restent efficaces dans la tâche spécifique de reconnaissance d'images pour laquelle ils ont été développés.
- **Facilité d'accès** : Les poids des modèles sont fournis sous forme de fichiers téléchargeables gratuitement et de nombreuses bibliothèques fournissent des API pratiques pour télécharger et utiliser les modèles directement.

Il existe une douzaine de modèles de reconnaissance d'images très performants qui peuvent être téléchargés et utilisés comme base pour la reconnaissance d'images et les tâches connexes de vision par ordinateur.

Trois des modèles les plus populaires sont les suivants :
- VGG (par exemple, VGG16 ou VGG19)
- GoogLeNet (par ex. InceptionV3)
- Réseau résiduel (par exemple, ResNet50)

**Keras** donne accès à un certain nombre de modèles pré-entraînés très performants qui ont été développés pour des tâches de reconnaissance d'images.

Ils sont disponibles via l'API d'applications et comprennent des fonctions permettant de **charger un modèle avec ou sans les poids pré-entraînés**, et de **préparer les données** d'une manière qu'un modèle donné peut attendre (par exemple, la mise à l'échelle de la taille et des valeurs des pixels)\
https://keras.io/api/applications/vgg/

Le modèle pré-entraîné peut être utilisé comme un programme autonome pour extraire des caractéristiques de nouvelles photographies.

Ici nous allons exploiter le réseau de neurones pré-entrainé **VGG16 pour extraire les features** c’est-à-dire se servir des features du réseau pour représenter les images du nouveau problème. La dernière couche fully-connected sera retirée car nous ne souhaitons pas à ce stade effectuer la classification.

La taille d'entrée par défaut des images pour ce modèle est **224x224**. Chaque application Keras attend un type spécifique de prétraitement des entrées. Pour VGG16:
- conversion des images d'entrée de RGB à BGR
- centrage à zéro de chaque canal de couleur par rapport à l'ensemble de données ImageNet, sans mise à l'échelle \
Ce preprocessing est effectué par la fonction tf.keras.applications.vgg16.preprocess_input

Nous allons charger chaque photo au format binaire, télécharger et préparer le modèle comme un modèle d'extraction de caractéristiques (cad enlever la dernière couche) puis extraire ces caractéristiques de la photo chargée à l'aide d'un UDF pandas Scalar Iterator.

Nous allons reprendre le script de la documentation issue de Databricks en l'adaptant à un modèle VGG16: \
https://docs.databricks.com/machine-learning/preprocess-data/transfer-learning-tensorflow.html

### Préparation du modèle

Les workers Spark ont besoin d'accéder au modèle et à ses poids. Le modèle sera chargé dans le pilote Spark puis les poids seront ensuite diffusés aux workers. \
Au lieu de distribuer ces informations avec chaque tâche sur le réseau (surcharge et perte de temps), les variables de diffusion sont utilisées. Ce sont des **variables partagées en lecture seule** qui sont **mises en cache et disponibles sur tous les nœuds du cluster** afin que les tâches puissent y accéder ou les utiliser. PySpark distribue donc les variables de diffusion aux travailleurs à l'aide d'algorithmes de diffusion efficaces afin de **réduire les coûts de communication au lieu d'envoyer ces données avec chaque tâche**.

In [0]:
# Modèle VGG16
train_time = time.time()
# Chargement du modèle VGG16 et suppression de la dernière couche
model = VGG16(weights='imagenet', include_top=False)
# Vérification de la suppression de la dernière couche (predictions (Dense & Fully Connected))
model.summary()

In [0]:
# Broadcasting des poids (on récupère les poids du modèle cad imagenet => on passe en C plus bas niveau donc plus rapide)
bc_model_weights = sc.broadcast(model.get_weights())

def model_fn():
    """ Modèle VGG16 avec suppression de la dernière couche et broadcasting des poids """
    model = VGG16(weights=None, include_top=False)
    model.set_weights(bc_model_weights.value)
    return model

### Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

Le nouveau Scalar Iterator pandas UDF est utilisé afin **d'amortir le coût du chargement de grands modèles sur les workers**. Il permet des **opérations vectorisées** qui peuvent augmenter les performances jusqu'à 100 fois par rapport aux UDF Python ligne par ligne.

In [0]:
def preprocess(content):
    """ Preprocessing des images conservées sous forme 
    d'octets dans un cache en mémoire.
    
    Arguments:
    --------------------------------
    content: : image au format binaire
    
    return:
    --------------------------------
    Image redimensionnée et preprocessée au format array """
    
    # Taille d'entrée des images pour le VGG16: 224 * 224
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    # Format array
    arr = img_to_array(img)
    # Centrage à zéro de chaque canal de couleur par rapport à l'ensemble de données ImageNet, sans mise à l'échelle 
    return preprocess_input(arr)


def featurize_series(model, content_series):
    """ Extraction des caractéristiques des images au format Array
    par le modèle indiqué en entrée.
    
    Arguments:
    --------------------------------
    model: modèle d'extraction des features
    content_series: images redimensionnées et preprocessées au format array
    
    return: 
    --------------------------------
    caractéristiques de l'image au format pd.Series """
    
    # Parallélisation des calculs pour le preprocessing des images
    input = np.stack(content_series.map(preprocess))
    # Extraction des features
    preds = model.predict(input)
    # Pour certaines couches, les caractéristiques de sortie seront des tenseurs multidimensionnels.
    # Vectorisation des features tensors pour faciliter le stockage dans les Spark DataFrames
    output = [p.flatten() for p in preds]
    return pd.Series(output)

In [0]:
# Méthode renvoie une colonne Spark DataFrame de type ArrayType(FloatType).
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    """ Fonction qui charge le modèle VGG16 avec suppression de la dernière couche et broadcasting des poids
    puis applique l'extraction de features de manière itérative aux images redimensionnées et preprocessées 
    au format array
    
    Arguments:
    --------------------------------
    content_series_iter: itérateur des images redimensionnées et preprocessées au format array
    
    return: 
    --------------------------------
    caractéristiques de l'image au format pd.Series """

  # Avec les Pandas UDF Scalar Iterator, nous pouvons charger le modèle une fois et le réutiliser ensuite
  # pour plusieurs lots de données.  Cela permet d'amortir les frais généraux liés au chargement de gros modèles
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

### Exécutions des actions d'extractions de features

In [0]:
# Les Pandas UDF sur des enregistrements de grande taille peuvent rencontrer des erreurs de type Out Of Memory.
# Si c'est le cas, tenter de réduire la taille de `maxRecordsPerBatch`:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [0]:
features_df = df_images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features"))

In [0]:
# Affichage du dataframe partitionné par hachage (16 partitions)
features_df.show()

## Réduction de dimension par ACP

Nous allons réaliser une ACP qui va permettre de **créer des features décorrélées entre elles**, de **diminuer leur dimension** tout en gardant un **niveau de variance expliquée élevée**. \
La réduction de dimension sera effectuée à l'aide de la documentation ci-dessous: \
https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/index.html \
https://github.com/apache/spark/blob/master/examples/src/main/python/ml/pca_example.py \
https://people.duke.edu/~ccc14/sta-663-2016/21D_Spark_MLib.html

In [0]:
# ACP
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import PCA

import matplotlib.pyplot as plt

In [0]:
# Extraction des features individuelles et assemblage en vecteur
# https://stackoverflow.com/questions/39025707/how-to-convert-arraytype-to-densevector-in-pyspark-dataframe

n = 22 # Size of features

assembler = VectorAssembler(
    inputCols=["features[{0}]".format(i) for i in range(n)], 
    outputCol="features_vector")

output = assembler.transform(features_df.select(
    "*", *(features_df["features"].getItem(i) for i in range(n))
))

In [0]:
# Standardisation (moyenne = 0 et écart type = 1)
scaler = StandardScaler(withMean=True, withStd=True,
                        inputCol='features_vector',
                        outputCol='std_features')

model = scaler.fit(output)
output = model.transform(output)

In [0]:
# ACP
pca = PCA(k=3, inputCol="std_features", outputCol="pcaFeatures")
pca = pca.fit(output)

In [0]:
def display_scree_plot(pca):
    """Fonction qui affiche l'éboulis des valeurs propres.

    Arguments:
    --------------------------------
    pca: acp, obligatoire
    
    return:
    --------------------------------
    None """
    
    scree = pca.explainedVariance*100
    plt.bar(np.arange(len(scree))+1, scree)
    plt.plot(np.arange(len(scree))+1, scree.cumsum(), c="red", marker='o')
    plt.xlabel("rang de l'axe d'inertie")
    plt.ylabel("pourcentage d'inertie")
    plt.title("Eboulis des valeurs propres")
    plt.show(block=False)

In [0]:
plt.figure(figsize=(10, 5))
display_scree_plot(pca)

Le graphique montre la quantité de variance capturée (sur l'axe des y) en fonction du nombre de composantes que nous incluons (sur l'axe des x). Une règle empirique consiste à préserver environ 80 % de la variance. Il faudrait donc garder **250 composantes**.

Nous effectuons l'ACP avec le nombre de composantes choisi ci-dessus.

In [0]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors

In [0]:
# Transform 60 features into MMlib vectors
assembler = VectorAssembler(
    inputCols="features",
    outputCol="features_vectors")

output = assembler.transform(features_df)

In [0]:
# Création d'un dataframe 

In [0]:
pca = PCA(k=3, inputCol="features", outputCol="pcaFeatures")
model = pca.fit(features_df.select('features'))

In [0]:
duration_vgg16 = np.round(time.time() - train_time,0)
print(f"Temps de traitement: {duration_vgg16} secondes")

In [0]:
PCA avec Pyspark + envoi sur S3 (dbutils)

In [0]:
features_df.write.mode("overwrite").parquet("dbfs:/ml/tmp/flower_photos_features")


dbutils.fs.ls("/FileStore/tables/data/test_S3/")