# 2. Déploiement de la solution sur le cloud

## 2.1 Installations et Path

### 2.1.1 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur

### 2.1.2 Import des librairies

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
import keras
import io
import os

In [None]:
from PIL import Image
from tensorflow.keras.applications.resnet50 import ResNet50,  preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, udf, pandas_udf, PandasUDFType, element_at, split
from keras.layers import Input, Dense
from pyspark.sql.types import ArrayType, FloatType
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT

### 2.1.3 Définition des PATH 

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [None]:
PATH = 's3://colab-notebook-p8'
PATH_Data = PATH+'/Data'
PATH_Preprocessing = PATH+'/Preprocessing'
PATH_Result = PATH+'/Results'

print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Preprocessing: '+\
      PATH_Preprocessing +\
      '\nPATH_Result: '+PATH_Result)

### 2.1.4 Démarrage de la session Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name

# Initialiser la session Spark
spark = SparkSession.builder \
    .appName("P8") \
    .getOrCreate()

## 2.3 Chargement des images

<u> Chargement des images </u>

Nous allons charger tous les fichiers d'images JPEG depuis un stockage S3,</br> y compris tous les sous-répertoires, dans un DataFrame Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name

# Lire les images depuis S3
images = spark.read.format("binaryFile") \
    .option("pathGlobFilter", "*.jpg") \
    .option("recursiveFileLookup", "true") \
    .load(PATH_Data)

images.show(5)

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> 

In [None]:
# Ajouter une colonne 'label' à partir du chemin
image_df = images.withColumn('label', element_at(split(images['path'], '/'), -2))

<u> Affichages des images </u>

In [None]:
# Afficher le schéma du DataFrame
image_df.printSchema()

# Afficher quelques exemples d'images
print("Afficher des images")
print(image_df.select('path','label').show(5,False))

## 2.4 Préprocessing des images

### 2.4.1 Préparation du modèle

Je vais utiliser la technique du **transfert learning** pour extraire les features des images.<br />
J'ai choisi d'utiliser le modèle **ResNet 50** pour sa rapidité d'exécution <br />

Nous chargeons le modèle ResNet50 avec les poids **précalculés** <br/>
issus d'imagenet, incluant la **couche de classification finale** <br/>  et en spécifiant le format de nos images en entrée

In [None]:
# Charger le modèle et diffuser les poids
model = ResNet50(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

Nous créons un nouveau modèle avec:<br/>
+ en entrée : l'entrée du modèle ResNet50 <br/>
+ en sortie : l'avant dernière couche du modèle ResNet50

In [None]:
new_model = Model(inputs=model.input,
                      outputs=model.layers[-2].output)

La méthode broadcast de SparkContext permet de créer un objet de diffusion qui <br/> est partagé entre tous les nœuds du cluster <br/>
Tous les workeurs doivent pouvoir accéder au modèle ainsi qu'à ses poids. <br />
Une bonne pratique consiste à charger le modèle sur le driver puis à diffuser <br />
ensuite les poids aux différents workeurs.

In [None]:
brodcast_weights = spark.sparkContext.broadcast(new_model.get_weights())

Affichage du résumé de notre nouveau modèle où nous constatons <br />
que <u>nous récupérons bien en sortie un vecteur de dimension (1, 1, 1280)</u> :

In [None]:
new_model.summary()

<u>Mettons cela sous forme de fonction</u> :

In [None]:
def model_fn():
    """
    Returns a ResNet50 model with top layer removed 
    and broadcasted pretrained weights.
    """
    # Créer le modèle ResNet50 avec les poids préentraînés sur ImageNet
    model = ResNet50(weights='imagenet',
                     include_top=True,
                     input_shape=(224, 224, 3))
    
    # Gel des couches pour éviter leur entraînement
    for layer in model.layers:
        layer.trainable = False
    
    # Créer un nouveau modèle sans la couche de classification finale
    new_model = Model(inputs=model.input,
                      outputs=model.layers[-2].output)
    
    # Affecter les poids préentraînés (si nécessaire)
    new_model.set_weights(brodcast_weights.value)  

    return new_model

### 2.4.2  Processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

#### 2.4.2.1 Processus de chargement des images.

 L'utilisation des pandas UDFs dans Apache Spark pour intégrer des modèles de machine learning <br/> est un excellent moyende traitement de données. <br/> En permettant le traitement par lots et en évitant le rechargement du modèle à chaque itération,<br/> cette approche optimise les performances et la gestion des ressources. 

 La fonction **preprocess** est conçue pour préparer des images brutes <br />
 (sous forme de bytes) afin qu'elles puissent être utilisées pour faire des prédictions avec un modèle de machine learning.<br /> Cela inclut le redimensionnement de l'image et la conversion en un format approprié.

In [None]:
def preprocess(content):
    try:
        # Ouvrir l'image à partir des octets
        img = Image.open(io.BytesIO(content)).resize([224, 224])
        # convertir l'image en tableau Tableau NumPy de la forme (224, 224, 3)
        arr = img_to_array(img)
        return preprocess_input(arr)
    except Exception as e:
        print(f"Erreur lors du traitement de l'image : {e}")
        return np.zeros((224, 224, 3))  # Retourner un tableau vide ou une image par défaut

La fonction **featurize_series** prend un modèle de machine learning <br /> et une série de données d'images brutes (sous forme de pd.Series de pandas)<br /> et renvoie les caractéristiques extraites de ces images.

In [None]:
def featurize_series(model, content_series):
    # Appliquer la fonction de prétraitement et empiler les résultats
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
     # Aplatir les tenseurs de caractéristiques pour un stockage plus facile
    output = [p.flatten() for p in preds]
    return pd.Series(output)

 La fonction **featurize_udf** est un User Defined Function (UDF) <br /> pour Apache Spark qui permet d'appliquer la fonction featurize_series à des séries de données d'images dans un contexte distribué.<br /> Cela permet de traiter efficacement de grandes quantités d'images en parallèle.

In [None]:
@pandas_udf(ArrayType(FloatType()), PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

#### 2.4.2.2 Exécution des actions d'extraction de features

Les Pandas UDF, sur de grands enregistrements (par exemple, de très grandes images), <br />
peuvent rencontrer des erreurs de type Out Of Memory (OOM).<br />
Si vous rencontrez de telles erreurs dans la cellule ci-dessous, <br />
essayez de réduire la taille du lot Arrow via 'maxRecordsPerBatch'

Je n'utiliserai pas cette commande dans ce projet <br />
et je laisse donc la commande en commentaire.

In [48]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

Nous pouvons maintenant exécuter la featurisation sur l'ensemble de notre DataFrame Spark.<br />
<u>REMARQUE</u> : Cela peut prendre beaucoup de temps, tout dépend du volume de données à traiter. <br />

<u> Appliquer l'UDF et récupérer les prédictions </u>

Créons un nouveau DataFrame predictions_df qui contient trois colonnes dont une seule colonne, "features",</br> remplie des prédictions générées par le modèle pour chaque entrée dans la colonne "content" du DataFrame d'origine image_df.

In [None]:
predictions_df = image_df.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )
print(f"Nombre de lignes dans predictions_df : {predictions_df.count()}")

In [None]:
predictions_df.write.mode("overwrite").parquet(PATH_Preprocessing)
print("Enregistrement de predictions_df effectué avec succès")

In [None]:
print(predictions_df.show(2,False))

## 2.5 Appliquer le PCA

 La colonne **"features"** dans le DataFrame predictions_df est mise à jour</br> pour contenir des **vecteurs denses** au lieu de listes ou de tableaux.

In [None]:
@udf(returnType=VectorUDT())
def to_vector(arr):
  return Vectors.dense(arr)

In [None]:
predictions_df = predictions_df.withColumn("features", to_vector(col("features")))
print("Mise à jour avec lec vecteurs denses")
# Vérification des résultats
predictions_df.printSchema()

<u> PCA </u>

In [None]:
# Créer une instance de PCA
pca = PCA(k=5, inputCol="features", outputCol="pca_features")

# Ajuster le modèle PCA
pca_model = pca.fit(predictions_df)

# Transformer les données
pca_result = pca_model.transform(predictions_df)

<u> Enregistrement des resultats </u>

In [None]:
# Enregistrer les résultats au format "Parquet" 
pca_result.write.mode("overwrite").parquet(PATH_Result)
print("Enregistrement au format Parquet effectué avec succès")

## 2.6 Chargement des données enregistrées

<u>On charge les données fraichement enregistrées dans un **DataFrame Spark**</u>  :

In [None]:
df = spark.read.parquet(PATH_Result)

<u>On affiche les 2 premières lignes du DataFrame</u> :

In [None]:
print("Afficher les resultats")
df.show(2,truncate=False)

<u>On affiche les dimension du DataFrame Spark</u> :

In [None]:
print("Afficher les dimensions")
num_rows = df.count()
num_columns = len(df.columns)
print(f"Dimensions: ({num_rows}, {num_columns})")

## 2.7 Arrêt de la session spark

In [None]:
# Arrêter Spark
spark.stop()