# Projet 7 : Déployer un modèle dans le cloud

## I - Présentation du projet

Ce projet s'inscrit dans le cadre du développement d'une application mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

L'objectif de ce projet est de développer un environnement Big Data qui comprendra le preprocessing et une étape de réduction de dimension.

<img src="fruits.png">

<img src="agritech.jpeg">

#### Banque d'images

Le jeu de données est un ensemble d'images de fruits et de labels associés :
https://www.kaggle.com/moltean/fruits

<img src="images_folders.png">

L'exécution du fichier python peut de deux manières :
- en local en spécifiant l'argument True
- en mode AWS en spécifiant l'argument False

## II - Présentation de Pyspark

<img src="pyspark.png">

#### Les Resilient Distributed Dataset (RDD)

Les RDD sont la principale innovation apportée par Spark.
Ils possèdent deux types de méthodes :
- les transformations qui donnent en sortie des RDD
- les actions qui donnent en sortie un résultat

C'est au moment d'une action que les différentes transformations utilisées sont exécutées

<img src="rdd.png">

#### Distribution des calculs sur les executors

Un job Spark est constitué d'un ensemble d'étapes, elles-mêmes constitués d'un ensemble de tâches.

Un job Spark correspond à une action sur un RDD et est composé de plusieurs étapes séparées par des shuffles.

<img src="spark_2.png">

Chaque tâche s'éxecute sur une partition différente des données et ces partitions sont crées par les RDD.

Les partitions sont réparties sur les différents executors.

<img src="spark_1.png">

## III - Code de calcul

Les 2 fonctionnalités principales utilisées dans ce script sont les RDD et les udf.
Le principe des RDD a été décrit ci-dessus. Nous utilisons également les pyspark dataFrame qui utilisent la technicité des RDD.

Quant aux udf ils permettent d'ajouter une nouvelle colonne à un dataFrame, comme étant le résultat d'une fonction appliqué à une colonne existante.

Le code de calcul est composé de 6 blocs distincts :
- Le chargement des librairies
- La fonction de chargement des données qui renvoie un DataFrame contenant le chemin d'accès aux données
- La fonction d'exctraction des catégories qui s'utilise via une udf
- La fonction de lecture des images qui renvoie un nouveau DataFrame avec une colonne supplémentaire correspondant aux données images
- La fonction de réduction dimensionelle par PCA qui renvoie un nouveau DataFrame ajouté d'une colonne correspondant aux données réduites
- La fonction main qui execute toutes les fonctions listées ci-dessus et qui enregistre au format parquet les résultats
    
Le script python est à executer en fournissant un arguement True ou False selon qu'il est executé en local ou sur la plateforme AWS. Dans le cas d'une execution sur la plateforme AWS, la connexion se fait via la librairie boto3.

#### Chargement des librairies

In [12]:
# Fonctions pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, IntegerType, DoubleType, DataType, FloatType
from pyspark.ml.image import ImageSchema
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector


# Fonction pour ouvrir l'image à partir de son chemin d'accès
from PIL import Image

# Librairies classiques
import numpy as np
import sys
import os
import io
import time

# Librairie pour se connecter au service S3 d'AWS
import boto3

#### Fonction de chargement des données

In [3]:
def load_datas(folder):
    
    """
    Retourne un dataFrame pyspark avec comme colonne la liste des chemins d'accès
    de toutes les images se trouvant le dossier folder
    """

    # Initialisation du temps de calcul
    start_time = time.time()
    
    lst_path =  []
    
    # Suivant l'argument sys.argv[1], connexion en local ou sur AWS
    if sys.argv[1] == 'True':

        sub_folders = os.listdir(folder)
        print(sub_folders)

        for f in sub_folders:

            lst_categ = os.listdir(folder + f)

            for file in lst_categ:
                lst_path.append(folder + f + "/" + file)
    else :
        # Connexion à l'espace de stockage S3 d'AWS
        session = boto3.session.Session(aws_access_key_id="AKIAJBXCP6SHSOSCCATQ",
                                        aws_secret_access_key="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")
        s3_client = session.client(service_name='s3', region_name="us-east-1")

        prefix = 'data'
        sub_folders = s3_client.list_objects_v2(Bucket="mlgbucket", Prefix=prefix)

        if "Contents" not in sub_folders:
            print("Erreur lors du chargement des images")
            print("Le dossier source n'a pas été trouvé")
            sys.exit(0)

        for key in sub_folders["Contents"]:

            file = key["Key"]
            file = file.replace(prefix + "/", "")
            lst_path.append(folder + file)

    print("Nombre d'images chargées :", len(lst_path))
    # Création d'un RDD à partit de la liste des chemins d'accès aux images
    rdd = sc.parallelize(lst_path)
    row_rdd = rdd.map(lambda x: Row(x))
    # Création d'un dataFrame pyspark à partir d'un RDD
    df = spark.createDataFrame(row_rdd, ["path_img"])

    # Affichage du temps de calcul
    print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))

    return df

#### Fonction d'extraction des catégories

In [4]:
def extract_categ(path):
    
    """
    Retourne le nom du dossier dans lequel se trouve l'image,
    qui correspond au nom du type de fruits.
    """
    
    list_file = path.split("/")
    categ = list_file[-2]
    
    return categ

#### Fonction de lecture des images

In [5]:
def read_images(df, col_path='path_img', new_size=(20, 20)):
    
    """
    Cette fonction prend comme en entrée un dataframe pyspark avec les noms des chemins d'accès aux images, les ouvres
    et renvoie le dataframe d'entrée avec une colonne supplémentaire qui est l'image sous forme de liste.
    
    Paramètres
    df(pyspark DataFrame): contient une colonne avec le chemin d'accès aux images
    col_path(string): nom de la colonne où récupérer le chemin d'accès aux images
    new_size(tuple): nouvelle taille d'image
    """


    # Traitement en mode local
    if sys.argv[1] == 'True':
        
        # fonction identitée :  renvoie le même chemin d'accès
        def get_path(img_path):
            return img_path


    # Traitement en mode AWS
    else:
        
        # Traitement spécifique pour l'accès à S3 via la librairie boto3
        def get_path(img_path):
            img_path = img_path.replace("s3://mlgbucket/", "")
            s3 = boto3.resource("s3", region_name='us-east-1')
            bucket = s3.Bucket("mlgbucket")
            object = bucket.Object(img_path)
            response = object.get()
            file_stream = response['Body']
            return file_stream
        
    # Ouvre l'image via la librairie pillow et resize l'image pour des raisons de mémoires
    def open_img(img_path, size=new_size):

        image = Image.open(img_path)
        image = image.resize((20, 20))

        return image

    # Initilisation du temps de calcul
    start_time = time.time()

    # Retourne l'image correspondante sous forme de liste pour chaque chemin d'accès d'image
    # flatten() pour unidimensionnaliser le tableau (images couleurs)
    # tolist() car pyspark n'accepte pas le format numpy
    ud_f = udf(lambda img_path: np.asarray(open_img(get_path(img_path))).flatten().tolist())

    df = df.withColumn('image', ud_f(col_path))

    # Affiche le temps de calcul
    print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))

    
    return df

#### Fonction de réduction dimmensionnelle par PCA

In [17]:
def pca_transformation(df, n_components=50, col_image='image'):
    
    """
    Applique un algorithme de PCA sur l'ensemble des images pour réduire la dimension de chaque image 
    du jeu de données.
    
    Paramètres:
    df(pyspark dataFrame): contient une colonne avec les données images
    n_components(int): nombre de dimensions à conserver
    col_image(string): nom de la colonne où récupérer les données images
    """

    # Initilisation du temps de calcul
    start_time = time.time()

    # Les données images sont converties au format vecteur dense
    ud_f = udf(lambda r: Vectors.dense(r), VectorUDT())
    df = df.withColumn('image', ud_f('image'))
    
    standardizer = StandardScaler(inputCol="image", outputCol="scaledFeatures",
                                  withStd=True, withMean=True)
    model_std = standardizer.fit(df)
    df = model_std.transform(df)

    # Entrainement de l'algorithme
    pca = PCA(k=n_components, inputCol='scaledFeatures', outputCol='pcaFeatures')
    model_pca = pca.fit(df)

    # Transformation des images sur les k premières composantes
    df = model_pca.transform(df)

    df = df.filter(df.pcaFeatures.isNotNull())
    
    # Affiche le temps de calcul
    print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))


    return df

#### Fonction d'execution du programme

In [18]:
if __name__ == "__main__":
    
    # Définis le chemin d'accès au dossier des images
    # Chemins différents suivant si le script est executé en local ou sur AWS
    try :
        if sys.argv[1] == 'True':
            folder = "data/"
        else :
            folder = "s3://mlgbucket/data/"
    except :
        sys.exit(0)
    print(folder)

    # Démarre la session Spark
    try :
        sc = SparkContext.getOrCreate()
        sc.setLogLevel('WARN')
        spark = SparkSession.builder.appName("name").getOrCreate()
    except :
        print("Erreur à la construction du moteur spark")

    print("---Liste des images---")
    df = load_datas(folder)
    df.show(5, False)

    print("----Extraction des catégories images-----")
    udf_categ = udf(extract_categ, StringType())
    df = df.withColumn("categ", udf_categ('path_img'))

    print("---Chargement des images---")
    df = read_images(df)
    df.show(5)

    print("---Réduction dimmensionnelle---")
    df = pca_transformation(df)
    df.show(5)

    print("---- Enregistrement des résultats ----")
    # Initilisation du temps de calcul pour l'enregistrement
    start_time = time.time()
    
    # Ecrit les résultats en mode parquet
    if sys.argv[1] == 'True':
        df.write.parquet(path='results', mode='overwrite')
    else :
        df.write.parquet(path='s3://mlgbucket/results/', mode='overwrite')
    
    # Affiche le temps de calcul de l'écriture des résultats
    print("Temps d'execution : {:.2f} secondes".format(time.time() - start_time))

data/
---Liste des images---
['Orange', 'Lemon']
Nombre d'images chargées : 13
Temps d'execution 0.10 secondes
+-----------------------+
|path_img               |
+-----------------------+
|data/Orange/1_100.jpg  |
|data/Orange/134_100.jpg|
|data/Orange/2_100.jpg  |
|data/Orange/116_100.jpg|
|data/Orange/117_100.jpg|
+-----------------------+
only showing top 5 rows

----Extraction des catégories images-----
---Chargement des images---
Temps d'execution 0.03 secondes
+--------------------+------+--------------------+
|            path_img| categ|               image|
+--------------------+------+--------------------+
|data/Orange/1_100...|Orange|[254, 254, 252, 2...|
|data/Orange/134_1...|Orange|[254, 254, 253, 2...|
|data/Orange/2_100...|Orange|[254, 254, 252, 2...|
|data/Orange/116_1...|Orange|[253, 254, 253, 2...|
|data/Orange/117_1...|Orange|[255, 255, 253, 2...|
+--------------------+------+--------------------+
only showing top 5 rows

---Réduction dimmensionnelle---
Temps d'exec