# **Déployer un modèle dans le cloud**
*Sofia Chevrolat (Janvier 2021)*
___
Cette étude vise à développer dans un environnement Big Data une première chaîne de traitement des données qui comprendra le préprocessing des données.

Le but est ici de mettre en place les premières briques de traitement qui serviront lorsqu’il deviendra nécessaire de passer à l’échelle en termes de volume de données.

___
_**Remerciements**:<br>
Merci à mon compagnon [J. Duplan](https://www.linkedin.com/in/julian-duplan-64844a41/) pour les discussions intéressantes.<br>
Merci également à mon mentor [Samia Drappeau](https://www.linkedin.com/in/samiadrappeau) pour les échanges d'idées, les conseils et les encouragements !_
___

Ce notebook est organisé comme suit:

**0. Mise en place**
- 0.1 Chargement des librairies et fonctions utiles
- 0.2 Définition des paramètres
- 0.3 Définition des fonctions utiles

**1. Chargement et description du jeu de données**
- 1.1 Description du jeu de données
- 1.2 Chargement & labellisation des données

**2. Réduction de dimensions des données**
- 2.1 Préparation du modèle pour extraction des features
- 2.2 Extraction des features
- 2.3 Visualisation des features

**3. Enregistrement des données prétraitées**

___
### 0. MISE EN PLACE

Dans cette première étape, le cadre de travail est mis en place, c'est-à-dire :
- Les librairies et packages Python nécessaires sont chargés
- Les fonctions utiles sont définies
___

___
#### 0.1 CHARGEMENT DES LIBRAIRIES

In [1]:
import io
import os
import time
import numpy as np
import pandas as pd

In [2]:
import pyspark
from pyspark.sql.functions import input_file_name
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from pyspark.sql.functions import col, pandas_udf, PandasUDFType

In [3]:
from PIL import Image

In [4]:
import tensorflow as tf
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

___
#### 0.2 DÉFINITION DES PARAMÈTRES

___
##### 0.2.1 Arguments commande spark-submit

Ces arguments seront utilisés pour l'exécution des commandes Spark.

In [5]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 \
                                    --conf spark.driver.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible=true \
                                    --conf spark.executor.extraJavaOptions=-Dio.netty.tryReflectionSetAccessible=true \
                                    pyspark-shell'

___
##### 0.2.2 Arguments S3

Les données sont stockées sur un bucket S3 protégé par une combinaison clé + rôle IAM.
Ces informations sont stockées sur un fichier au format json et seront à fournir pour pouvoir accéder aux données lors des étapes de :

- chargement des données brutes depuis le bucket S3
- sauvegarde des données prétraitées dans le bucket S3

In [6]:
bucket_name = 'mltrainingfruitimages'
credentials = pd.read_json(".access/AWS_access.json")

AWS_ACCESS_KEY_ID = credentials["S3_admin"].loc["Access_key_ID"]
AWS_SECRET_ACCESS_KEY = credentials["S3_admin"].loc["Secret_access_key"]
AWS_REGION = credentials["S3_admin"].loc["Region"]

___
##### 0.2.3 Création et configuration contexte et session Spark

La session Spark est initialisée ici, notamment avec les paramètres d'identification permettant d'accéder en lecture et écriture au bucket S3 contenant les données.

In [7]:
sc=pyspark.SparkContext()
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")

hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
hadoop_conf.set("com.amazonaws.services.s3.enableV4", "true")
hadoop_conf.set("fs.s3a.access.key", AWS_ACCESS_KEY_ID)
hadoop_conf.set("fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)
hadoop_conf.set("fs.s3a.endpoint", "s3." + AWS_REGION + ".amazonaws.com")

In [8]:
sparkSQL_session = pyspark.sql.SparkSession(sc)

In [9]:
# Pandas UDFs on large records (e.g., very large images) can run into
# Out of Memory (OOM) errors.
# To try and prevent such errors, we can try reducing the Arrow 
# batch size via 'maxRecordsPerBatch'.

sparkSQL_session.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

___
#### 0.3 DÉFINITION DES FONCTIONS UTILES

In [10]:
def parse_categorie(path):
    '''
        Returns an image label (category) 
        given its path
        
        Parameters
        ----------------
        path : StringType
               The path of an image

        Returns
        ---------------
        _    : string
               The label (category) of the image
    '''
    
    return path.split('/')[-2] if len(path) > 0 else ''

In [11]:
def load_data(path_img, spark_session):
    '''
        Loads the images contained in subdirectories at path_img 
        in spark_session in a Spark dataframe.
        
        Parameters
        ----------------
        path_img      : string
                        The path of the repository containing 
                        the subdirectories with images
                        
        spark_session : SparkSession
                        The running SparkSession with access
                        to the repository at path_img

        Returns
        ---------------
        _    : Spark Dataframe with 5 columns: 
               - path
               - modificationTime
               - length
               - content
               - category
               
    '''
    
    #time counter
    start = time.time()
    
    # Reading images from source
    
    df_img = spark_session.read.format("binaryFile")\
                          .option("pathGlobFilter", "*.jpg") \
                          .option("recursiveFileLookup", "true") \
                          .load(path_img)

    print('Loading complete.')
    print('Labelling images...')
        
    # Adding "path" column with image path
    df_img = df_img.withColumn("path", input_file_name())
    
    # Labelling images using their path
    udf_categorie = udf(parse_categorie, StringType())
    df_img = df_img.withColumn('category', udf_categorie('path'))
    
    print('Operation complete.')
    print('Elapsed Time: {}s'.format(time.strftime('%S',
                                                    time.gmtime(time.time()-start))))
    
    return df_img

In [12]:
def model_fn():
    '''
        Returns a ResNet50 model with top layer removed 
        and broadcasted pretrained weights.
        
        
        Parameters
        ----------------
        _      
        

        Returns
        ---------------
        model : ResNet50 model 
                The model has its top layer removed 
                and pretrained weights. 
        
    '''
    
    model = ResNet50(weights=None, include_top=False)
    model.set_weights(bc_model_weights.value)
    
    return model

In [13]:
def preprocess(content):
    '''
        Preprocesses raw image bytes for prediction.
        
        Parameters
        ----------------
        content : PandasUDF
                  The PandasUDF containing the images in 
                  binary format to be preprocessed
        

        Returns
        ---------------
        _       : numpy array
                  Preprocessed numpy.array or a tf.Tensor with type float32.
                  The images are converted from RGB to BGR, then each color 
                  channel is zero-centered with respect to the ImageNet dataset,
                  without scaling.
    
    '''
    
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    
    return preprocess_input(arr)

In [14]:
def featurize_series(model, content_series):
    '''
        Featurize a pd.Series of raw images using the input model.
        
        Parameters
        ----------------
        model          : ResNet50 model
        
        content_series : PandasUDF
                         The PandasUDF containing the images in 
                         binary format to be preprocessed
        

        Returns
        ---------------
        _       : Pandas series
                  Containing the image features
        
    '''
    
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark
    # Dataframes.
    
    output = [p.flatten() for p in preds]
    
    return pd.Series(output)

In [15]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
        This method is a Scalar iterator pandas UDF wrapping the 
        featurization function.
        The decorator specifies that this returns a Spark Dataframe column of
        type ArrayType(FloatType).
        
        Parameters
        ----------------
        content_series_iter: Pandas UDF scalar iterator
                             An iterator over batches of data, where 
                             each batch is a pandas Series of image data.
        

        Returns
        ---------------
        _       : SparkDataframe column of type ArrayType(FloatType) 
                  As specified by the decorator. Contains the features of 
                  each image.
        
        :param 
    '''
    
    model = model_fn()
    
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



___
### I.  DESCRIPTION ET CHARGEMENT DU JEU DE DONNÉES

___
#### 1.1  DESCRIPTION DU JEU DE DONNÉES

Les données consistent en un total de 90483 images de 100 * 100 pixels, parmi 131 classes de fruits et légumes.

Les images sont stockées dans un répertoire, sur un espace de stockage S3 à accès protégé, sur la plateforme AWS.
Elles sont réparties dans des sous-dossiers portant le nom de la classe du fruit ou du légume qu'elles représentent.

<u>_Arborescence_</u> : 

**mltrainingfruitimages** (Bucket S3)
- sourceimages
  - AppleBraeburn
    - 0_100.jpg
    - 1_100.jpg
    - ...
  - Apricot
    - 0_100.jpg
    - 1_100.jpg
    - ...
            
  - Avocado
    - 0_100.jpg
    - 1_100.jpg
    - ...

___
#### 1.2 CHARGEMENT & LABELLISATION DES DONNÉES

In [16]:
spark_df = load_data("s3a://"+bucket_name+"/sourceimages/*", sparkSQL_session)

Loading complete.
Labelling images...
Operation complete.
Elapsed Time: 06s


In [17]:
spark_df.show(5)

+--------------------+-------------------+------+--------------------+-------------+
|                path|   modificationTime|length|             content|     category|
+--------------------+-------------------+------+--------------------+-------------+
|s3a://mltrainingf...|2021-01-18 19:00:30|  5656|[FF D8 FF E0 00 1...|AppleBraeburn|
|s3a://mltrainingf...|2021-01-18 19:00:30|  5613|[FF D8 FF E0 00 1...|AppleBraeburn|
|s3a://mltrainingf...|2021-01-18 19:00:30|  5606|[FF D8 FF E0 00 1...|AppleBraeburn|
|s3a://mltrainingf...|2021-01-18 19:00:30|  5602|[FF D8 FF E0 00 1...|AppleBraeburn|
|s3a://mltrainingf...|2021-01-18 19:00:30|  5594|[FF D8 FF E0 00 1...|AppleBraeburn|
+--------------------+-------------------+------+--------------------+-------------+
only showing top 5 rows



Chaque image est maintenant labellisée suivant sa catégorie.

___
### II. RÉDUCTION DE DIMENSIONS DES DONNÉES

Nous utilisons ici une approche de featurization de nos images par transfer learning.

Cela consiste à extraire les features les plus pertinentes pour la classification de nos iamges en utilisant un modèle de deep learning pré-entraîné sur de la classification d'image, auquel on enlève la dernière couche - celle qui classifie - afin d'obtenir en sortie un tenseur des features les plus significatives de nos images.

Ce tenseur pourra ensuite être fourni à un autre modèle de classification, qui sera ainsi adapté à nos classes.

___
#### 2.1 PRÉPARATION DU MODÈLE POUR EXTRACTION DES FEATURES

Nous utilisons ici un modèle ResNet50 pré-entraîné pour de la classification d'images, auquel nous retirons la dernière couche.

In [18]:
# PREPARING THE MODEL

model = ResNet50(include_top=False)
model.summary() # check that the top layer has been removed

Model: "resnet50"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, None, None,  0                                            
__________________________________________________________________________________________________
conv1_pad (ZeroPadding2D)       (None, None, None, 3 0           input_1[0][0]                    
__________________________________________________________________________________________________
conv1_conv (Conv2D)             (None, None, None, 6 9472        conv1_pad[0][0]                  
__________________________________________________________________________________________________
conv1_bn (BatchNormalization)   (None, None, None, 6 256         conv1_conv[0][0]                 
___________________________________________________________________________________________

Les poids du modèle obtenu sont diffusés pour être exploités durant la phase de featurization.

Ceci est fait pour éviter le coût de passer ces poids de fonction en fonction, et donc des copies / destructions successives qui auraient lieu dans ce cas.

In [19]:
bc_model_weights = sc.broadcast(model.get_weights())

___
#### 2.2. EXTRACTION DES FEATURES

Les features les plus significatives pour la classification sont extraites de nos images et sauvegardées dans une dataframe Spark.

In [20]:
features_df = spark_df.repartition(16)\
                      .select(col("path"), col("category"),
                              featurize_udf("content").alias("features"))

___
#### 2.3. VISUALISATION DES FEATURES

In [21]:
# We persist the features_df to avoid the calculation having to 
# be done for each of the subsequent result type calls.

features_df.persist()

DataFrame[path: string, category: string, features: array<float>]

In [22]:
features_df.count()

1218

In [23]:
features_df.columns

['path', 'category', 'features']

In [24]:
features_df.show(5)

+--------------------+-------------+--------------------+
|                path|     category|            features|
+--------------------+-------------+--------------------+
|s3a://mltrainingf...|AppleBraeburn|[0.0, 0.0, 0.0, 0...|
|s3a://mltrainingf...|AppleBraeburn|[0.0, 0.0, 0.0, 0...|
|s3a://mltrainingf...|AppleBraeburn|[0.0, 0.0, 0.0, 0...|
|s3a://mltrainingf...|AppleBraeburn|[0.0, 0.0, 0.0, 0...|
|s3a://mltrainingf...|      Avocado|[0.0, 0.0, 0.0, 0...|
+--------------------+-------------+--------------------+
only showing top 5 rows



___
### III. ENREGISTREMENT DES DONNÉES PRÉTRAITÉES

Les données prétraitées sont stockées sur le cloud pour pouvoir être par la suite exploitées par le modèle d'apprentissage.

Le format "parquet" est privilégie pour son efficacité par rapport aux formats JSON ou CSV.

In [25]:
# Saving data to S3 bucket in parquet format

features_df.write\
           .mode('overwrite') \
           .parquet("s3a://"+bucket_name+"/img_features_export.parquet")