# Projet 8 

## - [I - Présentation du projet](#I---Présentation-du-projet)
- [II - Banque d'images](#II---Banque-d'images)
- [III - Code de calcul](#III---Code-de-calcul)
- [IV - Chargement des librairies](#IV---Chargement-des-librairies)
- [V - Chargement des données depuis S3](#V---Chargement-des-données-depuis-S3)
- [VI - Extraction des catégories](#VI---Extraction-des-catégories)
- [VII - Lecture des images](#VII---Lecture-des-images)
- [VIII - Réduction dimmensionnelle par PCA](#VIII---Réduction-dimmensionnelle-par-PCA)
- [VIIII - Exécution du programme](#VIIII---Exécution-du-programme)
- [X - Enregistrement dans S3](#X---Enregistrement-dans-S3)

### I - Présentation du projet

Ce projet s'inscrit dans le cadre du développement d'une application mobile qui permettrait aux utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

L'objectif de ce projet est de développer un environnement Big Data qui comprendra le preprocessing et une étape de réduction de dimension.

### II - Banque d'images

Le jeu de données est un ensemble d'images de fruits et de labels associés :
https://www.kaggle.com/moltean/fruits

### III - Code de calcul

Les 2 fonctionnalités principales utilisées dans ce script sont les RDD et les udf.
Le principe des RDD a été décrit ci-dessus. Nous utilisons également les pyspark dataFrame qui utilisent la technicité des RDD.

Quant aux udf ils permettent d'ajouter une nouvelle colonne à un dataFrame, comme étant le résultat d'une fonction appliqué à une colonne existante.

Le code de calcul est composé de 6 blocs distincts :
- Le chargement des librairies
- La fonction de chargement des données qui renvoie un DataFrame contenant le chemin d'accès aux données
- La fonction d'exctraction des catégories qui s'utilise via une udf
- La fonction de lecture des images qui renvoie un nouveau DataFrame avec une colonne supplémentaire correspondant aux données images
- La fonction de réduction dimensionelle par PCA qui renvoie un nouveau DataFrame ajouté d'une colonne correspondant aux données réduites
- La fonction main qui execute toutes les fonctions listées ci-dessus et qui enregistre au format parquet les résultats
    
Le script python est à executer en fournissant un arguement True ou False selon qu'il est executé en local ou sur la plateforme AWS. Dans le cas d'une execution sur la plateforme AWS, la connexion se fait via la librairie boto3.

#### IV - Chargement des librairies

In [1]:
# Fonctions pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType, IntegerType, DoubleType, DataType, FloatType
from pyspark.ml.image import ImageSchema
from pyspark.ml.feature import PCA
from pyspark.ml.feature import StandardScaler
from pyspark.ml.linalg import Vectors, VectorUDT, DenseVector


# Fonction pour ouvrir l'image à partir de son chemin d'accès
from PIL import Image

# Librairies classiques
import numpy as np
import sys
import os
import io
import time

# Librairie pour se connecter au service S3 d'AWS
import boto3

In [2]:
import PIL

#### V - Chargement des données depuis S3

In [3]:
def load_datas(folder):
    
    """
    Retourne un dataFrame pyspark avec comme colonne la liste des chemins d'accès
    de toutes les images se trouvant le dossier folder
    """

    # Initialisation du temps de calcul
    start_time = time.time()
    
    lst_path =  []
    
    # Connexion à l'espace de stockage S3 d'AWS
    session = boto3.session.Session(aws_access_key_id="XXXXXXXXXXXX",
                                    aws_secret_access_key="YYYYYYYYYY+6uiK9AMoBB8")
    s3_client = session.client(service_name='s3', region_name="us-east-1")

    prefix = 'data'
    sub_folders = s3_client.list_objects_v2(Bucket="mel-calculsdistribues", Prefix=prefix)

    if "Contents" not in sub_folders:
        print("Erreur lors du chargement des images")
        print("Le dossier source n'a pas été trouvé")
        sys.exit(0)

    for key in sub_folders["Contents"]:

        file = key["Key"]
        file = file.replace(prefix + "/", "")
        lst_path.append(folder + file)

    print("Nombre d'images chargées :", len(lst_path))
    # Création d'un RDD à partit de la liste des chemins d'accès aux images
    rdd = sc.parallelize(lst_path)
    row_rdd = rdd.map(lambda x: Row(x))
    # Création d'un dataFrame pyspark à partir d'un RDD
    df = spark.createDataFrame(row_rdd, ["path_img"])

    # Affichage du temps de calcul
    print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))

    return df

#### VI - Extraction des catégories

In [4]:
def extract_categ(path):
    
    """
    Retourne le nom du dossier dans lequel se trouve l'image,
    qui correspond au nom du type de fruits.
    """
    
    list_file = path.split("/")
    categ = list_file[-2]
    
    return categ

#### VII - Lecture des images

In [5]:
def read_images(df, col_path='path_img', new_size=(20, 20)):
    
    """
    Cette fonction prend comme en entrée un dataframe pyspark avec les noms des chemins d'accès aux images, les ouvres
    et renvoie le dataframe d'entrée avec une colonne supplémentaire qui est l'image sous forme de liste.
    
    Paramètres
    df(pyspark DataFrame): contient une colonne avec le chemin d'accès aux images
    col_path(string): nom de la colonne où récupérer le chemin d'accès aux images
    new_size(tuple): nouvelle taille d'image
    """

        
    # Traitement spécifique pour l'accès à S3 via la librairie boto3
    def get_path(img_path):
        img_path = img_path.replace("s3://mel-calculsdistribues/", "")
        s3 = boto3.resource("s3", region_name='us-east-1')
        bucket = s3.Bucket("mel-calculsdistribues")
        object = bucket.Object(img_path)
        response = object.get()
        file_stream = response['Body']
        return file_stream
        
    # Ouvre l'image via la librairie pillow et resize l'image pour des raisons de mémoires
    def open_img(img_path, size=new_size):

        image = Image.open(img_path)
        image = image.resize((20, 20))

        return image

    # Initilisation du temps de calcul
    start_time = time.time()

    # Retourne l'image correspondante sous forme de liste pour chaque chemin d'accès d'image
    # flatten() pour unidimensionnaliser le tableau (images couleurs)
    # tolist() car pyspark n'accepte pas le format numpy
    ud_f = udf(lambda img_path: np.asarray(open_img(get_path(img_path))).flatten().tolist())

    df = df.withColumn('image', ud_f(col_path))

    # Affiche le temps de calcul
    print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))

    
    return df

#### VIII - Réduction dimmensionnelle par PCA

In [6]:
def pca_transformation(df, n_components=50, col_image='image'):
    
    """
    Applique un algorithme de PCA sur l'ensemble des images pour réduire la dimension de chaque image 
    du jeu de données.
    
    Paramètres:
    df(pyspark dataFrame): contient une colonne avec les données images
    n_components(int): nombre de dimensions à conserver
    col_image(string): nom de la colonne où récupérer les données images
    """

    # Initilisation du temps de calcul
    start_time = time.time()

    # Les données images sont converties au format vecteur dense
    ud_f = udf(lambda r: Vectors.dense(r), VectorUDT())
    df = df.withColumn('image', ud_f('image'))
    
    standardizer = StandardScaler(inputCol="image", outputCol="scaledFeatures",
                                  withStd=True, withMean=True)
    model_std = standardizer.fit(df)
    df = model_std.transform(df)

    # Entrainement de l'algorithme
    pca = PCA(k=n_components, inputCol='scaledFeatures', outputCol='pcaFeatures')
    model_pca = pca.fit(df)

    # Transformation des images sur les k premières composantes
    df = model_pca.transform(df)

    df = df.filter(df.pcaFeatures.isNotNull())
    
    # Affiche le temps de calcul
    print("Temps d'execution {:.2f} secondes".format(time.time() - start_time))


    return df

#### VIIII - Exécution du programme

In [7]:
if __name__ == "__main__":
    
    # Définis le chemin d'accès au dossier des images
    # Chemins différents suivant si le script est executé en local ou sur AWS
    try :
        folder = "s3://mel-calculsdistribues/data/"
    except :
        sys.exit(0)
    print(folder)

    # Démarre la session Spark
    try :
        #sc = SparkContext.getOrCreate()
        
        import os
        os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.15,org.apache.hadoop:hadoop-aws:3.0.0 pyspark-shell'
        
        
        #sc.setLogLevel('WARN')
        spark = SparkSession.builder.appName("App_Fruits")\
            .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")\
            .config("spark.hadoop.fs.s3a.access.key", "XXXXXXXXXX")\
            .config("spark.hadoop.fs.s3a.secret.key", "YYYYYYYYYYYYYY+6uiK9AMoBB8")\
            .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\
            .config("spark.hadoop.fs.s3a.multiobjectdelete.enable","false")\
            .config("spark.hadoop.fs.s3a.fast.upload","true")\
            .config("spark.sql.parquet.filterPushdown", "true")\
            .config("spark.sql.parquet.mergeSchema", "false")\
            .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")\
            .config("spark.speculation", "false")\
            .config("spark.sql.parquet.writeLegacyFormat", 'true')\
            .getOrCreate()
        
        sc = spark.sparkContext
        sc.setSystemProperty('com.amazonaws.services.enableV4', 'true')
        sc._jsc.hadoopConfiguration().set('fs.s3a.endpoint','s3.eu-west-1.amazonaws.com')
      
    except :
        print("Erreur à la construction du moteur spark")

    print("---Liste des images---")
    df = load_datas(folder)
    df.show(5, False)

    print("----Extraction des catégories images-----")
    udf_categ = udf(extract_categ, StringType())
    df = df.withColumn("categ", udf_categ('path_img'))

    print("---Chargement des images---")
    df = read_images(df)
    df.show(5)

    print("---Réduction dimmensionnelle---")
    df = pca_transformation(df)
    df.show(5)



s3://mel-calculsdistribues/data/
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/mohammed/.ivy2/cache
The jars for the packages stored in: /home/mohammed/.ivy2/jars
com.amazonaws#aws-java-sdk-pom added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-6c92e697-0d19-49f2-b77a-f7e68a211138;1.0
	confs: [default]
	found com.amazonaws#aws-java-sdk-pom;1.10.15 in central
	found org.apache.hadoop#hadoop-aws;3.0.0 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.199 in central
:: resolution report :: resolve 847ms :: artifacts dl 26ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.199 from central in [default]
	com.amazonaws#aws-java-sdk-pom;1.10.15 from central in [default]
	org.apache.hadoop#hadoop-aws;3.0.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwn

---Liste des images---
Nombre d'images chargées : 1000


                                                                                

Temps d'execution 16.59 secondes
+--------------------------------------------------+
|path_img                                          |
+--------------------------------------------------+
|s3://mel-calculsdistribues/data/apple_6/r0_0.jpg  |
|s3://mel-calculsdistribues/data/apple_6/r0_10.jpg |
|s3://mel-calculsdistribues/data/apple_6/r0_100.jpg|
|s3://mel-calculsdistribues/data/apple_6/r0_102.jpg|
|s3://mel-calculsdistribues/data/apple_6/r0_104.jpg|
+--------------------------------------------------+
only showing top 5 rows

----Extraction des catégories images-----
---Chargement des images---
Temps d'execution 0.02 secondes


                                                                                

+--------------------+-------+--------------------+
|            path_img|  categ|               image|
+--------------------+-------+--------------------+
|s3://mel-calculsd...|apple_6|[255, 255, 255, 2...|
|s3://mel-calculsd...|apple_6|[255, 255, 255, 2...|
|s3://mel-calculsd...|apple_6|[255, 255, 255, 2...|
|s3://mel-calculsd...|apple_6|[255, 255, 255, 2...|
|s3://mel-calculsd...|apple_6|[255, 255, 255, 2...|
+--------------------+-------+--------------------+
only showing top 5 rows

---Réduction dimmensionnelle---


                                                                                

Temps d'execution 367.10 secondes


[Stage 13:>                                                         (0 + 1) / 1]

+--------------------+-------+--------------------+--------------------+--------------------+
|            path_img|  categ|               image|      scaledFeatures|         pcaFeatures|
+--------------------+-------+--------------------+--------------------+--------------------+
|s3://mel-calculsd...|apple_6|[255.0,255.0,255....|[0.0,0.0,0.0,0.0,...|[32.6642765091259...|
|s3://mel-calculsd...|apple_6|[255.0,255.0,255....|[0.0,0.0,0.0,0.0,...|[31.8495768511679...|
|s3://mel-calculsd...|apple_6|[255.0,255.0,255....|[0.0,0.0,0.0,0.0,...|[21.4767784999753...|
|s3://mel-calculsd...|apple_6|[255.0,255.0,255....|[0.0,0.0,0.0,0.0,...|[21.1599890188432...|
|s3://mel-calculsd...|apple_6|[255.0,255.0,255....|[0.0,0.0,0.0,0.0,...|[20.7566495717913...|
+--------------------+-------+--------------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

#### X - Enregistrement dans S3

In [9]:
df.write.mode('overwrite').format('parquet').save("s3a://mel-calculsdistribues/results_local")

                                                                                

In [11]:
sc.stop()

In [12]:
spark.stop()

## Fin