# Projet 8 - Openclassrooms - Déployer un modèle dans le cloud

## Import

In [13]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import Row, SparkSession, SQLContext
from pyspark.mllib.clustering import KMeans

import io
from io import StringIO
import os
import sys
import time

import numpy as np
import pandas as pd

import cv2
from PIL import Image


import boto.s3
import boto3


import sagemaker
from sagemaker import get_execution_role
import sagemaker_pyspark

# Définition des fonctions de pré-processing

In [14]:
# Renvoie la catéagorie d'une image à partir de son chemin  
# Entrée : chaine de caractères, contient le chemin complet de l'image 
# Sortie: tuple de chaines de caratères  catégorie, nom du fichier

def parse_categorie(path):
    if len(path) > 0:
        return path.split('/')[-1],path.split('/')[-2],
    else:
        return 'None','None'
    

# Renvoie la liste des descripteurs ORB d'une image format binaire
# Entrée: image lue en binaire dans le rdd
# Sortie: tuple categorie, tableau de descripteurs

def extract_orb_features(imgfile_imgbytes):

    try:
        imgfilename, imgbytes = imgfile_imgbytes
        #print(imgfilename)
        #print(imgbytes)
        nom, categorie = parse_categorie(imgfilename)
        #print(categorie)
        # convertion  binaire en RGB image
        img2 = Image.open(io.BytesIO(imgbytes))
        #print(img2)
        #conversion  Image en tableau  numpy
        img_np = np.array(img2)
        #print(img_np)
        orb = cv2.ORB_create()
        kp, descriptors = orb.detectAndCompute(img_np, None)

        return nom, categorie, descriptors
    except Exception:
        return 'problem','problem',''
    
# Renvoie un histogramme de fréquence du BOVW
# Entrée: Ligne d'un RDD contenant la liste des descripteurs d'une image
# Entrée: modele K-means entrainé sur l'ensemble des descripteurs
# Entrée: nombre de classes prédites par K-means
# Sortie: Ligne de RDD contenant nom (chaine), catégorie (chaine), vecteur sous forme d'une liste de 32 reels  
def bow(row, model,k):
    #entree: tables de descripteurs
    categorie = row['category']
    nom = row['name']
    feature_matrix = row['descriptors']
    bow = np.zeros(k)
    nkp = len(row['descriptors'])
    #print(nkp)

    for x in feature_matrix:
        idx = model.predict(x)
        #print(idx)
        bow[idx] += 1/nkp

    return Row(fileName=nom,category=categorie, bow=bow.tolist())


# Ecrit un dataframe spark au format parket
# Entrée: dataframme spark, chemin (chaine)
# Sortie: sauvegarde format parquet sur l'emplacement indiqué

def write_dataframe_parquet(dataframe, path_parquet):
    '''Enregistrement du spark dataframe au format parquet au chemin specifie'''
    try:
        #start = time.time()
        dataframe.write.format("parquet").mode('overwrite').save(path_parquet)
        print('Enregistrement effectué.')
            

        #print('Temps de sauvegarde : {} secondes'.format(time.strftime('%S', time.gmtime(time.time()-start))))
    except:
        print('L\'Enregistrement a échoué.')
    
    return True

# Définition de l'environement Spark

In [15]:
role = get_execution_role()

# Configure Spark to use the SageMaker Spark dependency jars
jars = sagemaker_pyspark.classpath_jars()

classpath = ":".join(sagemaker_pyspark.classpath_jars())

# See the SageMaker Spark Github to learn how to connect to EMR from a notebook instance
spark = SparkSession.builder.config("spark.driver.extraClassPath", classpath)\
    .master("local[*]").getOrCreate()
    
spark

# Variables globales

In [16]:
# Chemin d'acces des données sur S3
# Seul un échantillon des données de base est utilisé

sample_img_dir = "s3a://p8muriel/test2/*"
path_write = "s3a://p8muriel/test2_bovw.parquet"

# Pré-processing

## Chargement des données

Un échantillon des données sont stockées sur un bucket "p8muriel" sur S3. Ni les chemins, ni les noms de fichiers ne doivent contenir des espaces.  
-> faire un script préalable au chargement des données sur S3  

In [50]:
print("Chargement des images")
#Chargement des images dans un RDD sous forme binaire
t = time.time()
images = spark.sparkContext.binaryFiles(sample_img_dir)
print('...Temps de chargement des images : {}'.format(time.strftime("%H:%M:%S", time.gmtime(time.time()-t))))

Chargement des images
...Temps de chargement des images : 00:00:00


In [66]:
images.take(2)

[('s3a://p8muriel/test2/Salak/0_100.jpg',
  b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00\x00\x01\x00\x01\x00\x00\xff\xdb\x00C\x00\x02\x01\x01\x01\x01\x01\x02\x01\x01\x01\x02\x02\x02\x02\x02\x04\x03\x02\x02\x02\x02\x05\x04\x04\x03\x04\x06\x05\x06\x06\x06\x05\x06\x06\x06\x07\t\x08\x06\x07\t\x07\x06\x06\x08\x0b\x08\t\n\n\n\n\n\x06\x08\x0b\x0c\x0b\n\x0c\t\n\n\n\xff\xdb\x00C\x01\x02\x02\x02\x02\x02\x02\x05\x03\x03\x05\n\x07\x06\x07\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\n\xff\xc0\x00\x11\x08\x00d\x00d\x03\x01"\x00\x02\x11\x01\x03\x11\x01\xff\xc4\x00\x1f\x00\x00\x01\x05\x01\x01\x01\x01\x01\x01\x00\x00\x00\x00\x00\x00\x00\x00\x01\x02\x03\x04\x05\x06\x07\x08\t\n\x0b\xff\xc4\x00\xb5\x10\x00\x02\x01\x03\x03\x02\x04\x03\x05\x05\x04\x04\x00\x00\x01}\x01\x02\x03\x00\x04\x11\x05\x12!1A\x06\x13Qa\x07"q\x142\x81\x91\xa1\x08#B\xb1\xc1\x15R\xd1\xf0$3br\x82\t\n\x16\x17\x18\x19\x1a%&\'()*456789:CDEFGHIJSTUVWXYZcdefghijstuvwxyz\x83\x84\x85\x86\x8

# Formation du bag of visual word

In [51]:
print("Extraction des descripteurs par image")
start = time.time()
# formation rdd contenant la categorie et le tableau de descripteurs en supprimant les individus sans descripteurs
tuples_decriptors = images.map(lambda img: extract_orb_features(img)).filter(lambda x: x[2] is not None)
#transformation du rdd en rdd contenant des row pour faciliter sa manipulation
cat_descriptors = tuples_decriptors\
    .map(lambda x: (Row(name=x[0],category=x[1], descriptors=x[2])))
print('...Temps de traitement : {}'.format(time.strftime("%H:%M:%S", time.gmtime(time.time()-t))))

Extraction des descripteurs par image
...Temps de traitement : 00:00:03


In [52]:
#Clustering des descripteurs avec KMEANS
print("Clustering des descripteurs")

Clustering des descripteurs


In [53]:
#calcul du nombre de classe avec la formule nombre de categories x 10
print("...calcul du nombre de classes")
t = time.time()
nbcat = cat_descriptors.map(lambda x: x['category']).distinct().count()
k = nbcat *10
print("K-means avec k = ",k)
print('.....Temps de traitement count : {}'.format(time.strftime("%H:%M:%S", time.gmtime(time.time()-t))))

...calcul du nombre de classes
K-means avec k =  150
.....Temps de traitement count : 00:10:42


In [54]:
print("...flatmap des descripteurs dans un rdd")
t = time.time()
# rdd contenant un descripteur par ligne
cat_features = tuples_decriptors.flatMap(lambda x: [(x[0],x[1],w) for w in x[2]])
cat_desc = cat_features\
    .map(lambda x: (Row(name=x[0],category=x[1], features=x[2])))
descriptors = cat_desc.map(lambda x: x['features'])
print('.....Temps de traitement flatmap : {} '.format(time.strftime("%H:%M:%S", time.gmtime(time.time()-t))))

...flatmap des descripteurs dans un rdd
.....Temps de traitement flatmap : 00:00:00 


In [55]:
print('...clustering des descripteurs de kMeans')
t = time.time()      
#entrainement du modele
model = KMeans.train(descriptors, k, maxIterations=10, initializationMode="random")
print('.....Temps de traitement kMeans : {}'.format(time.strftime("%H:%M:%S", time.gmtime(time.time()-t))))

...clustering des descripteurs de kMeans
.....Temps de traitement kMeans : 00:09:52


In [35]:
print("...formation du rdd bag of visual word")
t = time.time()    
rdd_bow = cat_descriptors.map(lambda x: bow(x,model,k))
print('.....Temps de traitement bow : {}'.format(time.strftime("%H:%M:%S", time.gmtime(time.time()-t))))

...formation du rdd bag of visual word
.....Temps de traitement bow : 00:00:00


## Affichage du contenu du RDD

Juste pour information dans le cadre du projet, à ne pas faire en production

In [45]:
rdd_bow.take(2)

[Row(bow=[0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.043478260869565216, 0.021739130434782608, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.043478260869565216, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.021739130434782608, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.043478260869565216, 0.0, 0.043478260869565216, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.23913043478260865, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.021739130434782608, 0.021739130434782608, 0.021739130434782608, 0.0, 0.0, 0.043478260869565216, 0.0, 0.0, 0.021739130434782608, 0.021739130434782608, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.043478260869565216, 0.021739130434782608, 0.0, 0.0, 0.0, 0.0, 0.043478260869565216, 0.0, 0.021739130434782608, 0.0, 0.0, 0.0, 0.0, 0.0, 0.021739130434782608, 0.0, 0.0, 0.0, 0.0, 0.0, 0.06521739130434782, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.021739130434782608, 0.02173

# Enregistrement des données au format parquet

In [36]:
print("...conversion du rdd BOVW en spark dataframe")
t = time.time()  
df = rdd_bow.toDF()
print('.....Temps de traitement toDF : {}'.format(time.strftime("%H:%M:%S", time.gmtime(time.time()-t))))

...conversion du rdd BOVW en spark dataframe
.....Temps de traitement toDF : 00:00:02


In [46]:

print("...enregistrement format parquet")
t = time.time()  
df = rdd_bow.toDF()

#path_write = "s3a://p8muriel/test_bovw.parquet"

write_dataframe_parquet(df,path_write)
print('.....Temps de traitement enregistrement parquet : {}'.format(time.strftime("%H:%M:%S", time.gmtime(time.time()-t))))

...enregistrement format parquet
Enregistrement effectué.
.....Temps de traitement enregistrement parquet : 00:10:13


# Aperçu des données après réduction de dimension

In [37]:
df.show()

+--------------------+--------+-----------+
|                 bow|category|   fileName|
+--------------------+--------+-----------+
|[0.0, 0.0, 0.0, 0...|   Salak|  0_100.jpg|
|[0.0, 0.047619047...|   Salak|100_100.jpg|
|[0.0, 0.047619047...|   Salak|101_100.jpg|
|[0.0, 0.0, 0.0, 0...|   Salak|102_100.jpg|
|[0.0, 0.055555555...|   Salak|103_100.jpg|
|[0.0, 0.0, 0.0, 0...|   Salak|104_100.jpg|
|[0.0, 0.043478260...|   Salak|105_100.jpg|
|[0.0, 0.0, 0.0, 0...|   Salak|106_100.jpg|
|[0.0, 0.0, 0.0, 0...|   Salak|107_100.jpg|
|[0.0, 0.0, 0.0, 0...|   Salak|108_100.jpg|
|[0.0, 0.0, 0.0, 0...|   Salak|109_100.jpg|
|[0.0, 0.076923076...|   Salak|110_100.jpg|
|[0.0, 0.0, 0.0, 0...|   Salak|111_100.jpg|
|[0.0, 0.071428571...|   Salak|112_100.jpg|
|[0.0, 0.0, 0.0, 0...|   Salak|113_100.jpg|
|[0.0, 0.0, 0.0, 0...|   Salak|114_100.jpg|
|[0.0, 0.0, 0.0, 0...|   Salak|115_100.jpg|
|[0.0, 0.0, 0.0416...|   Salak|116_100.jpg|
|[0.0, 0.0, 0.0, 0...|   Salak|117_100.jpg|
|[0.0, 0.029411764...|   Salak|1

In [38]:
df.show(2, False)

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

## Schema

In [44]:
df.schema

StructType(List(StructField(bow,ArrayType(DoubleType,true),true),StructField(category,StringType,true),StructField(fileName,StringType,true)))

In [42]:
df.printSchema()

root
 |-- bow: array (nullable = true)
 |    |-- element: double (containsNull = true)
 |-- category: string (nullable = true)
 |-- fileName: string (nullable = true)



## Ecriture en csv pour comparaison avec parquet

In [49]:
print("...enregistrement format csv")
t = time.time() 
df.toPandas().to_csv("s3a://p8muriel/test2_bow_file.csv", header=True)
print('.....Temps de traitement enregistrement csv : {}'.format(time.strftime("%H:%M:%S", time.gmtime(time.time()-t))))

...enregistrement format csv
.....Temps de traitement enregistrement csv : 00:10:02


In [64]:
spark.sparkContext

In [62]:
spark.app.name

AttributeError: 'SparkSession' object has no attribute 'app'