# P8 - Déployer un modèle dans le cloud

# P8 - CLOUD - Microsoft Azure

Ce notebook traite de du chargement du jeu de données des images, du pré-processing, de la réduction de dimension et une classification pour des nouvelles images en utilisant **Microsoft Databricks Azure** et un **container blob de stockage** "**Data Lake Storage**".

## 1. Introduction

*****
**Mission**
*****
**Développer dans un environnement Big Data une première chaîne de traitement des données qui comprendra le preprocessing et une étape de réduction de dimension** pour une startup Fruits! de l'AgriTech pour mettre à disposition du grand public une application mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

*****
**Contraintes**
*****
- Le volume de données va augmenter très rapidement après la livraison de ce projet.
- Développer des scripts en Pyspark.
- Utiliser le cloud AWS ou autre (Microsoft Azure sera utilisé pour ce projet) pour profiter d’une architecture Big Data. 

*****
**Sources**
*****
- [Jeu de données](https://www.kaggle.com/moltean/fruits) : constitué des images de fruits et des labels associés, qui pourra servir de point de départ pour construire une partie de la chaîne de traitement des données.

In [0]:
%python
# Chargement des librairies
import datetime
import io
import sys
import time
from datetime import datetime
import pandas as pd
import numpy as np

# Visualisation
import matplotlib
import matplotlib.pyplot as plt
import seaborn as sns

# Pyspark
import pyspark
from pyspark.sql.functions import element_at, split, col, pandas_udf, PandasUDFType, udf
from pyspark.sql.types import StringType
from pyspark.sql import SparkSession

# Tensorflow Keras
import tensorflow as tf
from tensorflow.keras.applications.inception_v3 import InceptionV3, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array, load_img

# Gestion des images
import PIL
from PIL import Image

# Taches ML
from pyspark.ml.image import ImageSchema

# Réduction de dimension - PCA
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector

# Modélisation
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier, LogisticRegression, RandomForestClassifier, NaiveBayes
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml import Pipeline

# Matrice de confusion
import sklearn
from sklearn.metrics import confusion_matrix, classification_report
import itertools

%matplotlib inline


## 2. Préparation des données du train set

### 2.1. Jeu de données train set - au format "binaryFile"

#### 2.2.1. Connection du container blob de stockage à Azure Databricks

In [0]:
%python
dbutils.fs.unmount("/mnt/p8-cloud")

In [0]:
%python
dbutils.fs.mount(
  source = "wasbs://p8-cloud@p8cloud.blob.core.windows.net",
  mount_point = "/mnt/p8-cloud",
  extra_configs = {"fs.azure.account.key.p8cloud.blob.core.windows.net":"eO/iSSs9527scMebXetNiaApnpamHEq0G95gUYs3cBIjU3F4ZCUyx9xtzk9b05Aa8gqtL7kVXS94lTcWBFyBEQ=="})

#### 2.2.2. Chargement du jeu de données train set -"binaryFile"

In [0]:
%python
path_train_set = "/mnt/p8-cloud/resources/data/train-set/*/*"

In [0]:
%python
df_binary_train = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(path_train_set)

In [0]:
%python
df_binary_train.printSchema()

In [0]:
%python
df_binary_train.count()

In [0]:
%python
df_binary_train.show()

#### 2.2.2. Création de la classe des images

In [0]:
%python
# Ajout dans la colonne Classe pour chaque image traitée de l'avant dernier
df_binary_train = df_binary_train.withColumn("Classe", element_at(split(df_binary_train["path"], "/"), -2))

In [0]:
%python
# Schéma ?
df_binary_train.printSchema()

## 3. Extraction des features importantes pour chaque image

- Extraire les features les plus importantes pour la classification de nos images en utilisant un modèle **[InceptionV3](https://www.researchgate.net/figure/Schematic-diagram-of-the-Inception-v3-model-based-on-convolutional-neural-networks_fig3_337200783)** de deep learning pré-entrainé sur de la classification d'images.

### 3.1. Préparation du dataframe de travail

In [0]:
%python
df_images = df_binary_train.select("path", "Classe")
df_images.show()

### 3.2. Préparation du modèle InceptionV3

In [0]:
%python
# Instanciation du modèle
model = InceptionV3(
        include_top=False,  # Couche softmax de classification supprimée
        weights='imagenet',  # Poids pré-entraînés sur Imagenet
        input_shape=(100,100,3), # Image de taille 100x100 en couleur (channel=3)
        pooling='max' # Utilisation du max de pooling
)

In [0]:
%python
# Description des caractéristiques du modèle
model.summary()

### 3.3. Extraction des features pour chaque image

In [0]:
%python
# Instanciation du modèle
model = InceptionV3(
        include_top=False,  # Couche softmax de classification supprimée
        weights='imagenet',  # Poids pré-entraînés sur Imagenet
        input_shape=(100,100,3), # Image de taille 100x100 en couleur (channel=3)
        pooling='max' # Utilisation du max de pooling
)

In [0]:
%python
# Permettre aux workers Spark d'accéder aux poids utilisés par le modèle
bc_model_weights = spark.sparkContext.broadcast(model.get_weights())

In [0]:
%python
def model_fn():
  """
  Renvoie un modèle Inception3 avec la couche supérieure supprimée et les poids pré-entraînés sur imagenet diffusés.
  """
  model = InceptionV3(
        include_top=False,  # Couche softmax de classification supprimée
        weights='imagenet',  # Poids pré-entraînés sur Imagenet
#         input_shape=(100,100,3), # Image de taille 100x100 en couleur (channel=3)
        pooling='max' # Utilisation du max de pooling
  )
  model.set_weights(bc_model_weights.value)
  
  return model

In [0]:
%python
# Redimensionnement des images en 299x299
def preprocess(content):
    """
    Prétraite les octets de l'image brute pour la prédiction.
    param : content : objet image, obligatoire
    return : image redimensionnée en Array
    """
    # lecture + redimension (299x299) pour Xception
    img = PIL.Image.open(io.BytesIO(content)).resize([299, 299])
    # transforme l'image en Array     
    arr = img_to_array(img)
    return preprocess_input(arr)

In [0]:
%python
# Extraction des features par le modèle dans un vecteur
def featurize_series(model, content_series):
  """
  Featurise une pd.Series d'images brutes en utilisant le modèle d'entrée.
  param : 
    model : modèle à utiliser pour l'extraction, obligatoire.
    content_series : image redimensionnée (299, 299, 3) en Array
  :return: les features importantes de l'image en pd.Series.
  """
  input = np.stack(content_series.map(preprocess))
  # Prédiction du modèle
  preds = model.predict(input)
  # Pour certaines couches, les caractéristiques de sortie seront des tenseurs multidimensionnels.
  # Nous aplatissons les tenseurs de caractéristiques en vecteurs pour faciliter le stockage dans
  # les DataFrames de Spark.
  output = [p.flatten() for p in preds]
  
  return pd.Series(output)

In [0]:
%python
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
  '''
  Cette méthode est un Scalar Iterator pandas UDF enveloppant notre fonction de featurisation.
  Le décorateur spécifie que cette méthode renvoie une colonne Spark DataFrame de type ArrayType(FloatType).
  
  :param content_series_iter : Cet argument est un itérateur sur des lots de données, où chaque lot est une série pandas de données d'image.
  '''
  # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
  # for multiple data batches.  This amortizes the overhead of loading big models.
  model = model_fn()
  for content_series in content_series_iter:
    yield featurize_series(model, content_series)

In [0]:
%python
# Les UDF de Pandas sur de grands enregistrements (par exemple, de très grandes images) peuvent rencontrer des erreurs de type Out Of Memory (OOM).
# Si vous rencontrez de telles erreurs dans la cellule ci-dessous, essayez de réduire la taille du lot Arrow via `maxRecordsPerBatch`.
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [0]:
%python
# Nous pouvons maintenant exécuter la featurisation sur l'ensemble de notre DataFrame Spark.
# REMARQUE : Cela peut prendre beaucoup de temps (environ 10 minutes) car il applique un grand modèle à l'ensemble des données.
features_df = df_binary_train.repartition(16).select(col("path"), col('Classe'), featurize_udf("content").alias("features"))

In [0]:
%python
# 4484 images?
features_df.count()

### 3.4. Réduction de dimension

**Préparation des données**

In [0]:
%python
def preprocess_pca(dataframe):
  '''
     Préparation des données :
     - transformation en vecteur dense
     - standardisation
     param : dataframe : dataframe d'images
     return : dataframe avec features vecteur dense standardisé
  '''
  
  # Préparation des données - conversion des données images en vecteur dense
  transform_vecteur_dense = udf(lambda r: Vectors.dense(r), VectorUDT())
  dataframe = dataframe.withColumn('features_vectors', transform_vecteur_dense('features'))
  
  # Standardisation obligatoire pour PCA
  scaler_std = StandardScaler(inputCol="features_vectors", outputCol="features_scaled", withStd=True, withMean=True)
  model_std = scaler_std.fit(dataframe)
  # Mise à l'échelle
  dataframe = model_std.transform(dataframe)
  
  return dataframe

**Recherche du nombre de composante expliquant 95% de la variance**

In [0]:
%python
def recherche_nb_composante(dataframe, nb_comp=400):
    '''
       Recherche d nombre de composante expliquant 95% de la variance
       param : dataframe : dataframe d'images
       return : k nombre de composante expliquant 95% de la variance totale
    '''
    
    pca = PCA(k = nb_comp,
              inputCol="features_scaled", 
              outputCol="features_pca")
 
    model_pca = pca.fit(dataframe)
    variance = model_pca.explainedVariance
 
    # visuel
    plt.plot(np.arange(len(variance)) + 1, variance.cumsum(), c="red", marker='o')
    plt.xlabel("Nb composantes")
    plt.ylabel("% variance")
    plt.show(block=False)
 
    def nb_comp ():
      for i in range(500):
          a = variance.cumsum()[i]
          if a >= 0.95:
              print("{} composantes principales expliquent au moins 95% de la variance totale".format(i))
              break
      return i
 
    k=nb_comp()
  
    return k


In [0]:
%python
# Pré-processing (vecteur dense, standardisation)
df_pca = preprocess_pca(features_df)

In [0]:
%python
# Nombre de composante expliquant 95% de la variance
n_components = recherche_nb_composante(df_pca)

In [0]:
%python
# 325 composantes expliquent plus de 90% de la variance
n_components = 325

####Réduction de dimension PCA

In [0]:
%python
# Entrainement de l'algorithme
pca = PCA(k=n_components, inputCol='features_scaled', outputCol='vectors_pca')
model_pca = pca.fit(df_pca)

In [0]:
%python
# Transformation des images sur les k premières composantes
df_reduit = model_pca.transform(df_pca)

In [0]:
%python
# Visualisation du dataframe réduit
df_reduit.show()

####Sauvegarde des données

In [0]:
%python
# Sauvegarde des données
df_reduit.write.mode("overwrite").parquet("/mnt/p8-cloud/resources/output/resultats_features_parquet")

## 4. Test de classification

### 4.1. Préparation des données

**Seed**

In [0]:
%python
# Nombre aléatoire pour la reproductibilité des résultats
seed = 21

**Dataframe de travail**

In [0]:
%python
# Chargement du dataframe sauvegardé en parquet
parquetFiles = "/mnt/p8-cloud/resources/output/resultats_features_parquet/"

df_reduit = spark.read.parquet(parquetFiles)

In [0]:
%python
# Conservation de la classe de l'image et des vecteurs pca
data = df_reduit[["Classe", "vectors_pca"]]

In [0]:
%python
data.show(5)

**Encodage de la variable cible**

In [0]:
%python
# Encodage de la variable cible : la classe de l'image acceptable par le modèle
indexer = StringIndexer(inputCol="Classe", outputCol="Classe_index")

# Fit the indexer to learn Classe/index pairs
indexerModel = indexer.fit(data)

# Append a new column with the index
data = indexerModel.transform(data)

In [0]:
%python
display(data)

**Découpage du jeu du train set en jeux d'entraînement et de validation**

In [0]:
%python
# data splitting
(train_data, valid_data) = data.randomSplit([0.8, 0.2])

In [0]:
%python
print("Nbre élément train_data : " + str(train_data.count()))
print("Nbre élément valid_data : " + str(valid_data.count()))

In [0]:
%python
display(train_data.head(3))

### 4.2. Modélisation Logistic Regression

***Modélisation Régression Logistique***

[Source](https://spark.apache.org/docs/latest/ml-classification-regression.html#multinomial-logistic-regression)

***Entraînement du modèle***

In [0]:
%python
# Instanciation du modèle.
lr = LogisticRegression(labelCol="Classe_index", featuresCol="vectors_pca",
                        maxIter=5)

# Entraînement du modèle
lr_model = lr.fit(train_data)

***Prédictions***

In [0]:
%python
# Make predictions.
lr_predictions = lr_model.transform(valid_data)

# Select example rows to display.
lr_predictions.select("prediction", "Classe_index").show(5)

***Évaluation du modèle***

In [0]:
%python
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="Classe_index", predictionCol="prediction", metricName="accuracy")
lr_accuracy = evaluator.evaluate(lr_predictions)
print("Test Error = %g" % (1.0 - lr_accuracy))
print("Accuracy = %g " % lr_accuracy)

***Informations sur le modèle***

In [0]:
%python
# Print the coefficients and intercept for multinomial logistic regression
print("Coefficients: \n" + str(lr_model.coefficientMatrix))
print("Intercept: " + str(lr_model.interceptVector))

trainingSummary = lr_model.summary

# Obtain the objective per iteration
objectiveHistory = trainingSummary.objectiveHistory
print("objectiveHistory:")
for objective in objectiveHistory:
    print(objective)

# for multiclass, we can inspect metrics on a per-label basis
print("False positive rate by label:")
for i, rate in enumerate(trainingSummary.falsePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("True positive rate by label:")
for i, rate in enumerate(trainingSummary.truePositiveRateByLabel):
    print("label %d: %s" % (i, rate))

print("Precision by label:")
for i, prec in enumerate(trainingSummary.precisionByLabel):
    print("label %d: %s" % (i, prec))

print("Recall by label:")
for i, rec in enumerate(trainingSummary.recallByLabel):
    print("label %d: %s" % (i, rec))

print("F-measure by label:")
for i, f in enumerate(trainingSummary.fMeasureByLabel()):
    print("label %d: %s" % (i, f))

accuracy = trainingSummary.accuracy
falsePositiveRate = trainingSummary.weightedFalsePositiveRate
truePositiveRate = trainingSummary.weightedTruePositiveRate
fMeasure = trainingSummary.weightedFMeasure()
precision = trainingSummary.weightedPrecision
recall = trainingSummary.weightedRecall
print("Accuracy: %s\nFPR: %s\nTPR: %s\nF-measure: %s\nPrecision: %s\nRecall: %s"
      % (accuracy, falsePositiveRate, truePositiveRate, fMeasure, precision, recall))

### 4.3. Modélisation Decision Tree Classifier

***Modélisation Decision Tree Classifier***

[Source](https://spark.apache.org/docs/latest/ml-classification-regression.html#decision-tree-classifier)

***Entraînement du modèle***

In [0]:
%python
# Instanciation du modèle.
dtc = DecisionTreeClassifier(labelCol="Classe_index", featuresCol="vectors_pca",
                             seed=seed)

# Entraînement du modèle
dtc_model = dtc.fit(train_data)

***Prédictions***

In [0]:
%python
# Make predictions.
dtc_predictions = dtc_model.transform(valid_data)

# Select example rows to display.
dtc_predictions.select("prediction", "Classe_index").show(5)

***Évaluation du modèle***

In [0]:
%python
# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="Classe_index", predictionCol="prediction", metricName="accuracy")
dtc_accuracy = evaluator.evaluate(dtc_predictions)
print("Test Error = %g" % (1.0 - dtc_accuracy))
print("Accuracy = %g " % dtc_accuracy)

***Informations sur le modèle***

In [0]:
%python
print(dtc_model.toDebugString)