# <u>PROJET 8: "Déployez un modèle dans le cloud"

**Configuration:**
- Ce notebook est exécuté sur une instance AWS EC2, de type t2.medium (2 processeurs) RAM de 4Go, sur un système d'explotation Ubuntu 18.04. Afin de garantir la sécurité d'utilisation, la connexion à cette instance est effectué via un tunnel SSH permettant d'accéder à un terminal de l'instance. 
- Le script contenu dans ce notebook ne peuvent fonctionner qu'après avoir téléchargé et installé Anaconda 3 (version 2019/03), Java 8, Scala 12, ainsi que Spark version 2.4.7 avec hadoop 2.7.
- Il est nécessaire d'intaller les packages suivants: findspark, boto3, opencv-python, numpy
- Cette instance est capable d'accéder au compartiments S3 de mon compte AWS, grâce au rôle IAM qui lui a été attribué. Il n'est donc pas nécessaire de configurer les credentials AWS au début du script. 
- L'accès à ce notebook jupyter se fait via serveur web, comme indiqué dans ce tutoriel: https://openclassrooms.com/fr/courses/4452741-decouvrez-les-librairies-python-pour-la-data-science/5559821-lancez-une-session-de-notebook-jupyter-sur-aws

**Contexte**
- Ce notebook permet d'évaluer la faisabilité d'établissement d'une chaîne de prétraitement de données visuelles.
- Le données (images de fruits contenues dans un dossier par catégorie) sont stockées sur le cloud en utilisant le service s3.
- Nous utiliserons Spark afin de parralléliser les opérations de calcul sur tous les processeurs disponibles.
- L'objectif de ce script est d'importer les images, extraires leurs catégories, extraire les points d'intérêt de chaque image (avec descripteurs ORB), et réduire la dimension des descripteurs des points d'intérêt de chaque image avec une ACP. Les résultats de ces transformations seront enfin transférées et stockées dans un dossier 'results' dans le bucket S3 contenant les données initiales.

## <u>Sommaire:
### [I. Chargement des images dans un dataframe Spark](#section)
### [II. Détection et extraction de features avec descripteurs ORB](#section2)
### [III. Réduction dimensionnelle des descripteurs (ACP)](#section3)
### [VI. Enregistrement des résultats sur le répertoire S3](#section4)

In [1]:
import os

# Nous permettra de charger les dépendances nécessaires à certaines focntionnalités de Spark (hadoop-aws et java-aws)
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.11.538,org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell'

Pour démarrer le notebook, nous utiliserons la méthode init de Findspark. Cela permet d'ajouter un fichier de démarrage au profil IPython actuel afin que les variables d'environnement soient correctement définies et que pyspark soit importé au démarrage d'IPython.

In [2]:
import findspark
findspark.init()

Nous allons importer les différents modules et librairies que nous utiliserons pour effectuer les différentes transformations sur les données.

In [3]:
import boto3  # facilite l'intéraction des instances EC2 avec S3 
import numpy as np  # permet de redimensionner boto3

# Permet de créer une session Spark qui joue de rôle de pilote (Driver)
# de la façon dont Spark exécute les fonctions dans l’ensemble du cluster
from pyspark import SparkContext 
from pyspark.sql import SparkSession

# Fonctions permettant de maipuler les dataframes Spark (transformations)
from pyspark.sql.functions import lit  # permet de créer une colonne de type littéral (str)
from pyspark.sql.functions import udf  # permet d'appliquer une fonction donnée sur un df (user defined function)
from pyspark.sql.functions import explode  # retourne une ligne pour chaque élément d'une colonne donnée
from pyspark.sql.types import *  # permettra de manipuler tout tyle de données dans les udf
from functools import reduce  # permet de fusionner des dataframes spark

# Fonctions de machine learning pour dataframes spark
from pyspark.ml.image import ImageSchema  # permet de lire et stocker les caractéristiques d'une image
from pyspark.ml.linalg import Vectors, VectorUDT  #  permet de transformer un ojet de type liste en un vecteur spark
from pyspark.ml.feature import PCA  # permet d'appliquer une analyse en composantes princiaples sur des vecteurs
import cv2 as cv  # bibliothèque OpenCV contenant les focntions de descriptions ORB

## <u>I. Chargement des images dans un dataframe Spark<u><a name="section1"></a><u>

Nous allons d'abord initialiser une session Spark qui fera office de noeud maître, et permettra la parallélisation et la répartition entre les différents processeur disponibles. (Système Driver - Workers)

In [4]:
# Pilote (Driver) qui sera chargé de commander les processeurs en parallèle,
# pour effectuer les prochaines transformations et opérations spark
spark = (SparkSession.builder.appName("Prevot-P8")\
         .config('spark.hadoop.fs.s3a.impl',
                 'org.apache.hadoop.fs.s3a.S3AFileSystem')\
         .getOrCreate())

# SparkContext est le point d’accès à toutes les fonctionnalités de Spark,
# Il est contenu dans la Spark Session
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.us-east-2.amazonaws.com")  # configure accès S3
sc.setSystemProperty('com.amazonaws.services.s3.enableV4', 'true')

Nous pouvons désormais affecter une première tâche: charger les images dans un dataframe, et obtenir les étiquettes de chaque image.

In [5]:
bucket='p8-prevot-2'  # nom du bucket s3 contenant le dossier des images
prefix='data2/Test/'  # chemin d'accès vers les dossiers d'images

client = boto3.client('s3')  # objet de liaison vers S3 de mon compte AWS
result = client.list_objects(Bucket=bucket, Prefix=prefix, Delimiter='/')  # permet de charger le contenu du bucket

dataframes = []  # nous allons stocker une liste de dataframes (un df par dossier d'image, donc par type de fruit)

for o in result.get('CommonPrefixes'):  
    folder = o.get('Prefix')
    sub_folder = folder.split('/')[2]
    data_path = 's3a://{}/{}'.format(bucket, folder)
    print(data_path) # affichage du sous-dossier
    # Création d'un dataframe spark contenant les images et leurs caractéristiques
    images_df = ImageSchema.readImages(data_path, recursive=True).withColumn("label", lit(sub_folder))
    dataframes.append(images_df)  # ajout à la liste des dataframes

df = reduce(lambda first, second: first.union(second), dataframes)  # fusion des dfs (ayant tous le même schema)
df = df.repartition(200) # répartiton en plusieurs partitions

s3a://p8-prevot-2/data2/Test/apple_pink_lady_1/
s3a://p8-prevot-2/data2/Test/carrot_1/
s3a://p8-prevot-2/data2/Test/cucumber_3/
s3a://p8-prevot-2/data2/Test/pear_1/
s3a://p8-prevot-2/data2/Test/zucchini_1/


Chaque ligne du dataframe correspond à une image. Pour chaque image nous disposons de:
- Première colonne (image): origin = chemin d'accès au fichier sur s3
- Première colonne (image): height = hauteur (pixels)
- Première colonne (image): width = largeur (pixels)
- Première colonne (image): nChannels = 3 (RVB)
- Première colonne (image): mode = format d'encodage de l'image
- Première colonne (image): data = tableau contenant l'image encodée en binary
- Deuxième colonne (label): Etiquette de l'image (fruit)

In [6]:
df.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = false)
 |    |-- width: integer (nullable = false)
 |    |-- nChannels: integer (nullable = false)
 |    |-- mode: integer (nullable = false)
 |    |-- data: binary (nullable = false)
 |-- label: string (nullable = false)



In [7]:
df.show()

+--------------------+-----------------+
|               image|            label|
+--------------------+-----------------+
|[s3a://p8-prevot-...|       zucchini_1|
|[s3a://p8-prevot-...|       zucchini_1|
|[s3a://p8-prevot-...|       zucchini_1|
|[s3a://p8-prevot-...|       zucchini_1|
|[s3a://p8-prevot-...|       zucchini_1|
|[s3a://p8-prevot-...|       cucumber_3|
|[s3a://p8-prevot-...|       cucumber_3|
|[s3a://p8-prevot-...|       cucumber_3|
|[s3a://p8-prevot-...|       cucumber_3|
|[s3a://p8-prevot-...|       cucumber_3|
|[s3a://p8-prevot-...|           pear_1|
|[s3a://p8-prevot-...|           pear_1|
|[s3a://p8-prevot-...|           pear_1|
|[s3a://p8-prevot-...|           pear_1|
|[s3a://p8-prevot-...|           pear_1|
|[s3a://p8-prevot-...|apple_pink_lady_1|
|[s3a://p8-prevot-...|apple_pink_lady_1|
|[s3a://p8-prevot-...|apple_pink_lady_1|
|[s3a://p8-prevot-...|apple_pink_lady_1|
|[s3a://p8-prevot-...|apple_pink_lady_1|
+--------------------+-----------------+
only showing top

## <u>II. Détection et extraction de features avec descripteurs ORB<u><a name="section2"></a><u>

In [8]:
def orb_descriptors(img):
    ''' Fonction prenant en entrée une image du dataframe 
    et qui renvoie la liste des descripteurs ORB des points d'intérêts détectés sur l'image'''
    
    height = img[1]
    width = img[2]
    nchannels = img[3]
    data = img[5]
    
    img_array = np.array(data).reshape(height, width, nchannels)  # conversion et redimensionnement de l'image
    
    orb = cv.ORB_create(nfeatures=30)  # création d'un détecteur orb (30 points d'intérêts maximum)
    kp, des = orb.detectAndCompute(img_array,None)  # détection et description des pts d'intérêts
    
    des = des.tolist()  # on a un tableau contenant des descripteurs de 32 composantes chacuns
    
    return des

# user define function permettant la création de la colonne descripteurs suiveant la fonction "orb_descriptors"
orb_UDF = udf(lambda img: orb_descriptors(img), ArrayType(ArrayType(IntegerType())))
df = df.withColumn("Descripteurs ORB", orb_UDF("image"))  # application de l'udf sur le dataframe spark
print('ORB_UDF OK')

ORB_UDF OK


In [9]:
df.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = false)
 |    |-- width: integer (nullable = false)
 |    |-- nChannels: integer (nullable = false)
 |    |-- mode: integer (nullable = false)
 |    |-- data: binary (nullable = false)
 |-- label: string (nullable = false)
 |-- Descripteurs ORB: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: integer (containsNull = true)



Une ligne correspond a une image. Dans la colonne 'Descripteurs ORB', il y a un nombre non défini de descripteurs de chaque image (liste de liste de 32 composantes entières)

In [10]:
df.show()

+--------------------+-----------------+--------------------+
|               image|            label|    Descripteurs ORB|
+--------------------+-----------------+--------------------+
|[s3a://p8-prevot-...|       zucchini_1|[[222, 253, 148, ...|
|[s3a://p8-prevot-...|       zucchini_1|[[134, 96, 198, 1...|
|[s3a://p8-prevot-...|       zucchini_1|[[52, 201, 150, 1...|
|[s3a://p8-prevot-...|       zucchini_1|[[82, 48, 118, 16...|
|[s3a://p8-prevot-...|       zucchini_1|[[162, 172, 214, ...|
|[s3a://p8-prevot-...|       cucumber_3|[[120, 160, 244, ...|
|[s3a://p8-prevot-...|       cucumber_3|[[230, 141, 226, ...|
|[s3a://p8-prevot-...|       cucumber_3|[[134, 173, 230, ...|
|[s3a://p8-prevot-...|       cucumber_3|[[54, 113, 118, 2...|
|[s3a://p8-prevot-...|       cucumber_3|[[246, 44, 246, 1...|
|[s3a://p8-prevot-...|           pear_1|[[188, 249, 2, 23...|
|[s3a://p8-prevot-...|           pear_1|[[146, 229, 118, ...|
|[s3a://p8-prevot-...|           pear_1|[[34, 1, 98, 197,...|
|[s3a://

Nous allons 'exploser' le dataframe afin d'avoir une ligne par descripteur.

In [11]:
df = df.select(df['image'], df['label'], explode(df['Descripteurs ORB']))
df = df.withColumnRenamed("col","Descripteur")
print('Explode OK')

Explode OK


In [12]:
df.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = false)
 |    |-- width: integer (nullable = false)
 |    |-- nChannels: integer (nullable = false)
 |    |-- mode: integer (nullable = false)
 |    |-- data: binary (nullable = false)
 |-- label: string (nullable = false)
 |-- Descripteur: array (nullable = true)
 |    |-- element: integer (containsNull = true)



In [13]:
df.show()

+--------------------+----------+--------------------+
|               image|     label|         Descripteur|
+--------------------+----------+--------------------+
|[s3a://p8-prevot-...|zucchini_1|[222, 253, 148, 1...|
|[s3a://p8-prevot-...|zucchini_1|[252, 68, 254, 22...|
|[s3a://p8-prevot-...|zucchini_1|[188, 68, 111, 10...|
|[s3a://p8-prevot-...|zucchini_1|[184, 69, 110, 23...|
|[s3a://p8-prevot-...|zucchini_1|[130, 12, 134, 15...|
|[s3a://p8-prevot-...|zucchini_1|[236, 252, 148, 2...|
|[s3a://p8-prevot-...|zucchini_1|[128, 140, 6, 159...|
|[s3a://p8-prevot-...|zucchini_1|[252, 68, 254, 22...|
|[s3a://p8-prevot-...|zucchini_1|[213, 69, 111, 22...|
|[s3a://p8-prevot-...|zucchini_1|[232, 189, 212, 2...|
|[s3a://p8-prevot-...|zucchini_1|[153, 69, 111, 22...|
|[s3a://p8-prevot-...|zucchini_1|[128, 141, 6, 159...|
|[s3a://p8-prevot-...|zucchini_1|[252, 68, 238, 22...|
|[s3a://p8-prevot-...|zucchini_1|[194, 188, 212, 2...|
|[s3a://p8-prevot-...|zucchini_1|[184, 69, 110, 23...|
|[s3a://p8

Pour réaliser l'ACP, il sera nécessaire de transformer le format des descripteurs (liste entiière) en vecteur spark.

In [14]:
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df = df.withColumn("Descripteur-vect", list_to_vector_udf(df["Descripteur"]))
df = df.drop('Descripteur')
print('Vectorization OK')

Vectorization OK


In [15]:
df.printSchema()

root
 |-- image: struct (nullable = true)
 |    |-- origin: string (nullable = true)
 |    |-- height: integer (nullable = false)
 |    |-- width: integer (nullable = false)
 |    |-- nChannels: integer (nullable = false)
 |    |-- mode: integer (nullable = false)
 |    |-- data: binary (nullable = false)
 |-- label: string (nullable = false)
 |-- Descripteur-vect: vector (nullable = true)



In [16]:
df.show()

+--------------------+----------+--------------------+
|               image|     label|    Descripteur-vect|
+--------------------+----------+--------------------+
|[s3a://p8-prevot-...|zucchini_1|[222.0,253.0,148....|
|[s3a://p8-prevot-...|zucchini_1|[252.0,68.0,254.0...|
|[s3a://p8-prevot-...|zucchini_1|[188.0,68.0,111.0...|
|[s3a://p8-prevot-...|zucchini_1|[184.0,69.0,110.0...|
|[s3a://p8-prevot-...|zucchini_1|[130.0,12.0,134.0...|
|[s3a://p8-prevot-...|zucchini_1|[236.0,252.0,148....|
|[s3a://p8-prevot-...|zucchini_1|[128.0,140.0,6.0,...|
|[s3a://p8-prevot-...|zucchini_1|[252.0,68.0,254.0...|
|[s3a://p8-prevot-...|zucchini_1|[213.0,69.0,111.0...|
|[s3a://p8-prevot-...|zucchini_1|[232.0,189.0,212....|
|[s3a://p8-prevot-...|zucchini_1|[153.0,69.0,111.0...|
|[s3a://p8-prevot-...|zucchini_1|[128.0,141.0,6.0,...|
|[s3a://p8-prevot-...|zucchini_1|[252.0,68.0,238.0...|
|[s3a://p8-prevot-...|zucchini_1|[194.0,188.0,212....|
|[s3a://p8-prevot-...|zucchini_1|[184.0,69.0,110.0...|
|[s3a://p8

## <u>III. Réduction dimensionnelle des descripteurs (ACP)<u><a name="section3"></a><u>

Il est alors possible de réaliser une analyse en composante principale sur 20 composantes (par exemple) afin de réduire la taille des descripteurs.

In [17]:
pca = PCA(k=20, inputCol="Descripteur-vect", outputCol="Descripteur-ACP")
pca = pca.fit(df)
df = pca.transform(df)
df = df.drop('Descripteur-vect')
print('PCA OK')

PCA OK


In [18]:
df.show()

+--------------------+----------+--------------------+
|               image|     label|     Descripteur-ACP|
+--------------------+----------+--------------------+
|[s3a://p8-prevot-...|zucchini_1|[367.946231808941...|
|[s3a://p8-prevot-...|zucchini_1|[109.782596723375...|
|[s3a://p8-prevot-...|zucchini_1|[54.5693452270751...|
|[s3a://p8-prevot-...|zucchini_1|[100.421663261629...|
|[s3a://p8-prevot-...|zucchini_1|[493.223592675236...|
|[s3a://p8-prevot-...|zucchini_1|[488.019373538996...|
|[s3a://p8-prevot-...|zucchini_1|[535.950474403000...|
|[s3a://p8-prevot-...|zucchini_1|[103.977681023866...|
|[s3a://p8-prevot-...|zucchini_1|[118.961349416184...|
|[s3a://p8-prevot-...|zucchini_1|[425.006237382828...|
|[s3a://p8-prevot-...|zucchini_1|[124.416240677825...|
|[s3a://p8-prevot-...|zucchini_1|[487.704364341379...|
|[s3a://p8-prevot-...|zucchini_1|[84.2301011644544...|
|[s3a://p8-prevot-...|zucchini_1|[493.990015016403...|
|[s3a://p8-prevot-...|zucchini_1|[107.326118706153...|
|[s3a://p8

## <u>VI. Enregistrement des résultats sur le répertoire S3<u><a name="section4"></a><u>

Nous avons réalisé les premières briques de la châine de traitement. Il faut désormais stocker sur S3 le dataframe afin de pouvoir y accéder de nouveau pour la suite.

In [20]:
df.write.parquet(path="s3a://p8-prevot-2/results/", mode="overwrite")
print('Write OK')

Write OK
