<h1>Projet 8 du parcours "Data Scientist" : Déployez un modèle dans le cloud</h1>

<h2>Sommaire</h2>


1) Introduction et présentation de l'architecture utilisée<br/>
2) Préprocessing et réduction de dimension des données




<h2>Introduction et présentation de l'architecture utilisée</h2>

Voici mon projet 8 du parcours "Data Scientist", à savoir le déploiement d'un modèle dans le cloud. Ainsi, nous allons déployer un modèle de réduction de dimensions sur des images de fruits sur AWS, dans le but de pouvoir traiter un nombre très important d'images sans faire face à des limitations. Ici, nous avons ainsi un jeu de données limité pour effectuer notre modèle, mais ce jeu pourrait être 100 à 1000 fois plus important dans le futur.<br/>

Pour ce faire, nous utilisons donc une architecture de calculs distribués, composée de :
- Un espace de stockage S3 contenant les différentes images du projet réparties en 3 dossiers : Traning, Test, et multiple_test-fruits. Ici, pour entraîner le modèle, nous nous servirons exclusivement des images Test.
- Un serveur de calculs EC2 qui permettra d'exécuter notre modèle, et lié au S3 grâce à Hadoop. Ici, nous avons choisi une instance payante, mais de petite taille, à savoir t2.xlarge, pour à la fois faire tourner rapidement notre modèle, mais également prévoir un peu de marge avec l'augmentation future du nombre d'images.

Pour permettre le bon fonctionnement de notre EC2, il est tout d'abord nécessaire d'installer quelques packages et librairies :
- Java version 8
- Python 3.6 et pip version python 3
- Différentes librairies Python3 : 
 - numpy
 - pandas
 - jupyter pour créer ce notebook
 - tensorflow de version >2. Ici, nous avons choisi la 2.1.0 qui est une version stable du package
 - pyarrow de version comprise entre 0.10 et 0.14 (ici la 0.13) pour correspondre aux prérequis de Tensorflow et de Pyspark.
 - Spark et Hadoop (ici Spark 2.4.5 prebuilt for Hadoop 2.7) dans une version stable et fonctionnant avec notre version de Java
Après ces installations, les variables d'environnement de Java, de Spark, de Pyspark et d'ouverture du notebook par PySpark ont été définies dans ~/.bashrc.

Pour lancer ce notebook, il faut ainsi lancer Pyspark en utilisant ces paramètres :

`$SPARK_HOME/bin/pyspark --master local[*,4] --packages org.apache.hadoop:hadoop-aws:2.7.3,com.amazonaws:aws-java-sdk:1.7.4`

--master local[*,4] correspond à un lancement de Pyspark en local, sur tous les coeurs du processeur disponibles et jusqu'à 4 fois en cas d'erreur. Les deux packages indiqués, Hadoop et Java, sont ceux nécessaires à la lecture des fichiers de AWS S3 dans Spark : ici nous avons choisi respectivement les versions 2.7.3 et 1.7.4 pour des raisons de compatibilité. 

Une fois cette commande lancée, nous pouvons utiliser Jupyter dans le navigateur, en utilisant le DNS public de l'instance EC2 et le port 8888


<h2>Préprocessing et réduction de dimension des données</h2>

In [None]:
import os
from datetime import datetime

# Find the localisation of Spark/PySpark on EC2 local server
import findspark 
findspark.init()

from subprocess import call
import copy

# Import of ML and Data Science necessary librairies
import numpy as np
import pandas as pd
import tensorflow as tf
import shutil
from tensorflow.keras.applications.mobilenet import preprocess_input
from tensorflow.keras.models import load_model

from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, DoubleType
from pyspark.sql.functions import col, pandas_udf, lit

Après avoir importé les librairies nécessaires et dont nous avons parlé auparavant, il est nécessaire d'instancier une session de Spark (ici Pyspark vu qu nous sommes en Python), à la fois pour exécuter notre modèle, mais aussi déjà pour importer nos images stockées sur S3. Plusieures données sont ainsi à considérer :
- indiquer les clés d'accès et secrètes afin d'accéder aux différents espaces (ici retirées pour des raisons de sécurité)
- indiquer l'emplacement du dossier dans le bucket S3 contenant les différentes images
- limiter la mémoire RAM utilisée afin qu'elle ne sature pas et ne fasse pas planter notre exécution. Pour le nombre actuel d'images utilisées, cela ne sera pas important, mais nous préférons être prévoyant dans le cadre de l'augmentation des données à traiter.
- "com.amazonaws.services.s3.enableV4" est nécessaire dans le cadre de l'import de données depuis S3 provenant de la région eu-west-3, à savoir Paris.
- Les autres paramètres suivant la configuration de la session permettent là encore l'impotration du stockage S3 dans EC2.

In [None]:
# Save the keys to access to S3
AWSAccessKeyId='yourkey'
AWSSecretKey='yourkey'

# Indicate the localisation of S3 bucket
S3dir = 's3a://projet8bucket'

#Creation of the Spark Session 
spark = SparkSession.builder.config("spark.python.worker.reuse", "False") \
                            .appName('dim_red') \
                            .getOrCreate()

spark.sparkContext.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", AWSAccessKeyId)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", AWSSecretKey)
spark._jsc.hadoopConfiguration().set("fs.s3a.impl","org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("com.amazonaws.services.s3.enableV4", "true")
spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider","org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
spark._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "s3.eu-west-3.amazonaws.com")

Nous importons maintenant nos images dans un Spark Dataframe : ainsi, nous récupérons toutes les données en format image contenues dans le dossier "Test", et ce de manière récursive car nos images sont situées elles-mêmes dans des dossiers contenus dans "Test".

Nous redimensionnons nos images dans des dimensions égales, et surtout compatibles avec le modèle pré-entraîné que nous allons utiliser pour notre réduction de dimension, à savoir MobileNetV2. Ce modèle est en effet simple d'utilisation (il suffit de l'importer depuis Tensorflow) et est rapide tout en donnant de bons résultats. Des modèles comme InceptionResNetV2 auraient également pu être utilisés dans notre cas, mais ce modèle est plus lourd et aurait demandé une plus grosse infrastructure de EC2 : nous avons donc choisi ce compromis pour des raisons de coûts.

In [None]:
# Load images from S3 bucket to a Spark Dataframe
df = spark.read.format('image').load(S3dir + '/Test/*/*', recursive=True)

In [None]:
# Resize all images to required dimension of MobileNetV2 model
IMAGE_SIZE = 96
IMG_SHAPE = (IMAGE_SIZE, IMAGE_SIZE, 3)

# Import MobileNetV2 model, but only the first layers (last used for other images)
model = tf.keras.applications.MobileNetV2(input_shape=IMG_SHAPE, 
                                          include_top=False, 
                                          weights='imagenet')

# Save the model locally for future use
model.save('models', save_format='tf')

Instructions for updating:
If using Keras pass *_constraint arguments to layers.
INFO:tensorflow:Assets written to: models/assets


Nous devons maintenant définir tous les éléments de transformation des images, pour une exécution future. En effet, Pyspark n'exécute le code que lors d'un appel à l'action qui se situe à la fin du code. 

La fonction de réduction de dimensions va ainsi s'effectuer en plusieurs étapes :
- Import de l'image
- Transformation des différentes variables d'images obtenues dans le Spark Dataframe (width, height et nChannels) en des variables compatibles avec l'utilisation du modèle. D'autres solutions auraient pu être utilisées (transformation directe par une fonction de Spark par exemple), mais des erreurs nous ont contraints à coder ceci par nous-mêmes
- Ajout des images transformées dans l'entrée du modèle par la fonction "preprocess_input()"
- Redimensionnement des images aux dimensions indiquées précédemment
- Réduction de dimensions grâce aux premières couches sélectionnés du modèle sauvegardé et obtention des features d'intérêt
- Conversion de la liste des features en Series Pandas

Cette fonction classique est ensuite transformée en fonction de pandas_udf, afin de pouvoir exécuter la fonction de manière parallélisée (plusieurs images traitées en même temps)

 

In [None]:
# Function to transform the images, apply the dimesnion reduction and
# obtain the features with the saved model 
def red_dim(width, height, nChannels, data):
    # Transformation of Spark image data to Tensorflow format
    images = []
    for i in range(height.shape[0]):
        x = np.ndarray(
                shape=(height[i], width[i], nChannels[i]),
                dtype=np.uint8,
                buffer=data[i],
                strides=(width[i] * nChannels[i], nChannels[i], 1))
        images.append(preprocess_input(x))
    # Resize images
    images = np.array(tf.image.resize(images, [IMAGE_SIZE, IMAGE_SIZE]))
    # Load and apply the model
    model = load_model('models')
    preds = model.predict(images).reshape(len(width), 3 * 3 * 1280)
    
    # Return a Pandas Series with all features images 
    return pd.Series(list(preds))

# Transformation into a pandas_udf function
red_dim_udf = pandas_udf(red_dim, returnType=ArrayType(DoubleType()))

Une dernière variable a besoin d'être initialisée : celle permettant de connaître le temps au lancement du programme (du jour à la seconde). Ceci nous sera utile pour créer le dossier contenant les différents fichiers finaux de notre sortie de modèle, afin d'éviter les doublons et ainsi le plantage de notre programme.

In [None]:
today = datetime.now().strftime('%Y_%m_%d_%H_%M_%S')

Maintenant que tout a été initialisé et la fonction de transformation écrite, il ne reste plus qu'à faire un appel à action pour exécuter tout le code précédent. On créée ainsi une colonne dans notre Dataframe contenant toutes les features de chaque image, et une autre contenant ces features converties en string pour être compatibles avec un enregistrement en fichier CSV. Nous sélectionnons ensuite uniquement la colonne contenant le lien vers les images (et ainsi leur nom) et celle contenant les features converties.

Ce dataframe contenant maintenant deux colonnes va enfin être enregistré dans 5 fichiers CSV différents en parallèle : en effet, un unique fichier sera très lourd et donc plus long à traiter par la suite que plusieurs, et 350Mo environ pour chaque fichier semble un bon compromis entre le nombre de fichiers et leur poids. Ces fichiers sont contenus dans un dossier du bucket S3. 

In [None]:
# Launch the action of Pyspark application
results=df.withColumn("dim_red", red_dim_udf(col("image.width"), col("image.height"), \
                                                    col("image.nChannels"), \
                                                    col("image.data"))) \
                 .withColumn("dim_red_string", lit(col("dim_red").cast("string"))) \
                 .select("image.origin", 'dim_red_string')\
                 .repartition(5).write.csv(S3dir + '/results' + today)


"print(results.count())\n\nresults.repartition(5).write.csv(S3dir + '/results' + today)"

Ainsi, nous avons maintenant nos 5 fichiers prêts à être traités pour calculer les différentes prédictions de fruits dans le futur. Une amélioration possible serait par exemple d'utiliser une instance EMR avec l'instance EC2 car cela serait une méthode plus actuelle, mais un blocage du groupe de sécurité de notre compte a rendu ceci impossible sans créer un nouveau compte AWS, l'idée a donc été pour le moment abandonnée.