# Parcours Datascience - Projet 8: Déployez un modèle dans le Cloud

Etudiant: Vincent GAGNOT

# 2- Traitement sur le cloud

En 2e étape, je transfère le traitement élaboré dans la première partie dans un environnement Cloud.  
La solution technique adoptée se compose de:
 - AWS S3
 - AWS EMR
 - AWS EC2
  
Le traitement se fait via JupyterHub, directement dans EC2. Ce traitement n'est pas complètement exécutable hors de cet environnement.

## a) Paramétrage du cluster

Le paramétrage est décrit en annexe de la présentation powerpoint du projet.  
Lors du paramétrage, on demande l'installation des packages nécessaires à notre travail (dont Spark).

## b) Démarrage de l'environnement et contrôle des packages

On commence par démarrer l'environnement Spark en validant une cellule vide.

In [1]:
# Cellule vide pour démarrer Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1629205267660_0001,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Dans notre cas, les packages nécessaires ont déjà été installés, mais si nécessaire, on peut en installer de nouveaux.  
Pour le faire, ou simplement pour contrôler les packages installés, je permets l'usage d'environnements virtuels.


In [2]:
%%configure -f
{
    "conf": {
        "spark.pyspark.python": "python3",
        "spark.pyspark.virtualenv.enabled": "true",
        "spark.pyspark.virtualenv.type":"native",
        "spark.pyspark.virtualenv.bin.path":"/usr/bin/virtualenv"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1629205267660_0002,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1629205267660_0002,pyspark,idle,Link,Link,✔


On peut, à tout moment, afficher les détails sur la session en cours, le lien sur l'interface utilisateur de Spark ou vers le log.

In [3]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1629205267660_0002,pyspark,idle,Link,Link,✔


On peut lister les packages présents, installés par défaut ou par mon bootstrap.

In [4]:
sc.list_packages()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package                    Version           
-------------------------- ------------------
absl-py                    0.12.0            
aiobotocore                1.3.3             
aiohttp                    3.7.4.post0       
aioitertools               0.8.0             
astor                      file-astor-VERSION
astunparse                 1.6.3             
async-timeout              3.0.1             
attrs                      21.2.0            
aws-cfn-bootstrap          2.0               
beautifulsoup4             4.9.3             
bleach                     3.3.0             
boto                       2.49.0            
botocore                   1.20.106          
cachetools                 4.1.1             
certifi                    2020.12.5         
chardet                    4.0.0             
click                      7.1.2             
cryptography               3.2.1             
docutils                   0.14              
flatbuffers                1.12   

La commande pour installer de nouveaux packages serait la suivante:

In [5]:
# sc.install_pypi_package("Package")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## c) Import des packages

In [6]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os
import pyspark
import tensorflow as tf

from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2
from tensorflow.keras.models import Model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## d) Construction des paths et lecture des images

On travaille uniquement sur S3, où on a déjà chargé les images sur lesquelles on travaillera (plus de 20000 images).  
On créé 3 chaines de caractères, désignant les adresses du bucket, des données d'entrée et de celles de sortie.

In [18]:
Path = 's3://bucketocproj8vgag'
Path_Data = Path+'/Data_OC_P8'
Path_Output = Path+'/Output'

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On appelle les images.

In [19]:
images = spark.read.format("binaryFile")\
.option("pathGlobFilter", "*.jpg")\
.option("recursiveFileLookup", "true")\
.load(Path_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

On créé les étiquettes, comme dans la première partie.

In [20]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+----------+
|                path|   modificationTime|length|             content|     label|
+--------------------+-------------------+------+--------------------+----------+
|s3://bucketocproj...|2021-08-17 08:45:53|  6557|[FF D8 FF E0 00 1...|Clementine|
|s3://bucketocproj...|2021-08-17 10:31:49|  6549|[FF D8 FF E0 00 1...|     Cocos|
|s3://bucketocproj...|2021-08-17 08:45:15|  6541|[FF D8 FF E0 00 1...|Clementine|
|s3://bucketocproj...|2021-08-17 08:44:59|  6540|[FF D8 FF E0 00 1...|Clementine|
|s3://bucketocproj...|2021-08-17 10:32:16|  6536|[FF D8 FF E0 00 1...|     Cocos|
+--------------------+-------------------+------+--------------------+----------+
only showing top 5 rows

## e) Préparation du preprocessing

J'applique de nouveau le preprocessing proposé sur la page databricks.  
Voici le modèle, et la couche de sortie.

In [12]:
model = ResNet50()
model = Model(inputs=model.inputs, outputs=model.layers[-2].output)
model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50_weights_tf_dim_ordering_tf_kernels.h5
Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
conv1_pad (ZeroPadding2D)       (None, 230, 230, 3)  0           input_1[0][0]                    
__________________________________________________________________________________________________
conv1_conv (Conv2D)             (None, 112, 112, 64) 9472        conv1_pad[0][0]                  
__________________________________________________________________________________________________
conv1_bn (BatchNormalization)   (None, 112, 112, 64) 256

A nouveau, je construis la séquence d'opérations à exécuter pour le traitement.

In [13]:
bc_model_weights = spark.sparkContext.broadcast(model.get_weights())

def model_fn():
    """
    Returns a ResNet50 model with top layer removed and broadcasted pretrained weights.
    """
    model = ResNet50()
    model = Model(inputs=model.inputs, outputs=model.layers[-2].output)
    model.set_weights(bc_model_weights.value)
    return model

def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).
  
    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                                is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



## f) Application du preprocessing

L'application se lance de la même manière qu'en local.

In [23]:
features_df = images.repartition(16).select(col("path"), col("label"), featurize_udf("content").alias("features"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
features_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------+--------------------+
|                path|    label|            features|
+--------------------+---------+--------------------+
|s3://bucketocproj...|    Cocos|[0.03943967, 0.03...|
|s3://bucketocproj...|Mandarine|[0.9072441, 0.109...|
|s3://bucketocproj...|Mandarine|[1.384338, 0.0821...|
|s3://bucketocproj...|Mandarine|[1.3363504, 0.377...|
|s3://bucketocproj...|Mandarine|[1.921056, 0.0217...|
+--------------------+---------+--------------------+
only showing top 5 rows

## g) Ecriture des données de sortie

On écrit les données, cette fois, dans S3.

In [26]:
features_df.write.mode("overwrite").parquet(Path_Output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## h) Lecture et examen des données de sortie

In [30]:
df_results = pd.read_parquet(Path_Output, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
df_results.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                           features
0  s3://bucketocproj8vgag/Data_OC_P8/Mandarine/44...  ...  [1.3105799, 0.5568161, 0.78942746, 0.008736156...
1  s3://bucketocproj8vgag/Data_OC_P8/Nectarine/r_...  ...  [0.103881106, 0.47426072, 0.031109279, 0.01188...
2  s3://bucketocproj8vgag/Data_OC_P8/Cocos/108_10...  ...  [0.010759027, 0.14477642, 0.0, 0.0, 0.09264878...
3  s3://bucketocproj8vgag/Data_OC_P8/Clementine/r...  ...  [1.6197768, 0.43258932, 0.035666835, 0.0, 0.07...
4  s3://bucketocproj8vgag/Data_OC_P8/Cocos/r_305_...  ...  [0.0, 0.0, 1.2245523, 0.0, 0.0, 0.0, 0.0158153...

[5 rows x 3 columns]

On retrouve le meme type de résultats qu'en local.

In [32]:
len(df_results.loc[0, 'features'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

2048

La dimension du vecteur retourné est celle qu'on attendait.

# Conclusion

On a obtenu une sortie au format attendu, en un temps record malgré le volume de données. 