# Déployez un modèle dans le cloud¶

### CONTEXTE :

- La jeune entreprise émergente dans le domaine de l'AgriTech, baptisée "Fruits!", a pour objectif d'introduire des solutions novatrices pour optimiser la récolte des fruits. L'entreprise se fixe pour mission de préserver la variété biologique des fruits en élaborant des méthodes de traitement spécifiques pour chaque type de fruit, en mettant au point des robots cueilleurs dotés d'intelligence artificielle.

- Dans un premier temps, la start-up cherche à accroître sa visibilité en lançant une application mobile accessible au grand public. Cette application permettra aux utilisateurs de photographier un fruit et d'obtenir des informations pertinentes à son sujet. Au-delà de cette fonction, l'entreprise vise à éveiller la conscience écologique du grand public vis-à-vis de la biodiversité des fruits et à élaborer une première version d'un système de classification d'images de fruits.

- Parallèlement, le développement de l'application mobile servira de base pour l'établissement d'une première version de l'infrastructure Big Data nécessaire à la réalisation de leur vision.

#### Démarrage de la session Spark

In [1]:
#lancer une session spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1691403420870_0003,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
2,application_1691403420870_0003,pyspark,idle,Link,Link,,✔


##### Installation des packages
- Les composants requis ont été installés en utilisant le processus de démarrage lors de la création du serveur.

#### Import des librairies

In [3]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Configuration des chemins d'accès pour le chargement des images, la sauvegarde des résultats et l'application du PCA.
Nous faisons appel à nos données hébergées sur S3 avec la simplicité d'un stockage local.

In [29]:
PATH = 's3://aws-logs-143241216037-eu-north-1'
PATH_Data = PATH+'/Test/DATAP8-OC'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)
PATH_PCA= PATH+"/PCA"

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://aws-logs-143241216037-eu-north-1
PATH_Data:   s3://aws-logs-143241216037-eu-north-1/Test/DATAP8-OC
PATH_Result: s3://aws-logs-143241216037-eu-north-1/Results

#### Chargement des données

In [5]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
images.show(4)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://aws-logs-143...|2023-08-07 09:54:26|  5365|[FF D8 FF E0 00 1...|
|s3://aws-logs-143...|2023-08-07 09:54:26|  5340|[FF D8 FF E0 00 1...|
|s3://aws-logs-143...|2023-08-07 09:54:26|  5338|[FF D8 FF E0 00 1...|
|s3://aws-logs-143...|2023-08-07 09:54:26|  5331|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 4 rows

Je retiens uniquement le chemin de l'image et j'intègre une nouvelle colonne pour les étiquettes correspondantes de chaque image :

In [7]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+--------------------------------------------------------------------+-----+
|path                                                                |label|
+--------------------------------------------------------------------+-----+
|s3://aws-logs-143241216037-eu-north-1/Test/DATAP8-OC/COCOS/3_100.jpg|COCOS|
|s3://aws-logs-143241216037-eu-north-1/Test/DATAP8-OC/COCOS/8_100.jpg|COCOS|
|s3://aws-logs-143241216037-eu-north-1/Test/DATAP8-OC/COCOS/7_100.jpg|COCOS|
|s3://aws-logs-143241216037-eu-north-1/Test/DATAP8-OC/COCOS/1_100.jpg|COCOS|
|s3://aws-logs-143241216037-eu-north-1/Test/DATAP8-OC/COCOS/9_100.jpg|COCOS|
+--------------------------------------------------------------------+-----+
only showing top 5 rows

None

#### Préparation du modèle

In [8]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [12]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

In [13]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



#### Exécutions des actions d'extractions de features

In [14]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://aws-logs-143241216037-eu-north-1/Results

In [17]:
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Réduction de dimension de type PCA

In [20]:
features_df.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)

In [21]:
from pyspark.ml.functions import array_to_vector
df_pca=features_df.select("path","label", array_to_vector('features').alias('features'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
df_pca.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: vector (nullable = true)

In [25]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
scaler = StandardScaler(
    inputCol = 'features', 
    outputCol = 'scaledFeatures',
    withMean = True,
    withStd = True
).fit(df_pca)

# when we transform the dataframe, the old
# feature will still remain in it
df_scaled = scaler.transform(df_pca)
df_scaled.show(6)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------+--------------------+--------------------+
|                path|  label|            features|      scaledFeatures|
+--------------------+-------+--------------------+--------------------+
|s3://aws-logs-143...|   KIWI|[1.47442615032196...|[1.49211764645949...|
|s3://aws-logs-143...|  COCOS|[1.21801674365997...|[0.98731027166260...|
|s3://aws-logs-143...|APRICOT|[0.94679319858551...|[0.45333748355567...|
|s3://aws-logs-143...|   CORN|[1.03584098815917...|[0.62865079000315...|
|s3://aws-logs-143...| BANANE|[1.36630952358245...|[1.27926246604011...|
|s3://aws-logs-143...|   APFG|[0.0,0.0,0.0,0.0,...|[-1.4106666310966...|
+--------------------+-------+--------------------+--------------------+
only showing top 6 rows

In [27]:
n_components = 2
pca = PCA(
    k = n_components, 
    inputCol = 'scaledFeatures', 
    outputCol = 'pcaFeatures'
).fit(df_scaled)

df_pca1 = pca.transform(df_scaled)
print('Explained Variance Ratio', pca.explainedVariance.toArray())
df_pca1.show(6)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Explained Variance Ratio [0.06636144 0.05491077]
+--------------------+-------+--------------------+--------------------+--------------------+
|                path|  label|            features|      scaledFeatures|         pcaFeatures|
+--------------------+-------+--------------------+--------------------+--------------------+
|s3://aws-logs-143...|   KIWI|[1.47442615032196...|[1.49211764645949...|[0.20370138522370...|
|s3://aws-logs-143...|  COCOS|[1.21801674365997...|[0.98731027166260...|[2.98315459008501...|
|s3://aws-logs-143...|APRICOT|[0.94679319858551...|[0.45333748355567...|[-11.229430234661...|
|s3://aws-logs-143...|   CORN|[1.03584098815917...|[0.62865079000315...|[-4.5984914862741...|
|s3://aws-logs-143...| BANANE|[1.36630952358245...|[1.27926246604011...|[-2.1063978236245...|
|s3://aws-logs-143...|   APFG|[0.0,0.0,0.0,0.0,...|[-1.4106666310966...|[-1.0963310766324...|
+--------------------+-------+--------------------+--------------------+--------------------+
only showin

In [30]:
df_pca1.write.mode("overwrite").parquet(PATH_PCA)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…