#                                    Déployez un modèle dans le cloud

La très jeune start-up de l'AgriTech, nommée "Fruits!",
cherche à proposer des solutions innovantes pour la récolte des fruits.

La volonté de l’entreprise est de préserver la biodiversité des fruits
en permettant des traitements spécifiques pour chaque espèce de fruits
en développant des robots cueilleurs intelligents.

La start-up souhaite dans un premier temps se faire connaître en mettant
à disposition du grand public une application mobile qui permettrait aux
utilisateurs de prendre en photo un fruit et d'obtenir des informations sur ce fruit.

Pour la start-up, cette application permettrait de sensibiliser le grand public
à la biodiversité des fruits et de mettre en place une première version du moteur
de classification des images de fruits.

De plus, le développement de l’application mobile permettra de construire
une première version de l'architecture Big Data nécessaire.

------------------------------------------------------------------------------------------------------------------------------
### Objectifs


 - Développer une première chaîne de traitement des données qui
Il n’est pas nécessaire d’entraîner un modèle pour le moment.
comprendra le preprocessing et une étape de réduction de dimension.
 - Tenir compte du fait que le volume de données va augmentertrès rapidement après la livraison de ce projet, ce qui implique de:
     - Déployer le traitement des données dans un environnement Big Data
     - Développer les scripts en pyspark pour effectuer du calcul distribué
     - à utiliser le cloud AWS pour profiter d’une architecture Big Data (EMR, S3, IAM)

In [1]:
import pandas as pd

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1673190521502_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
from PIL import Image
import numpy as np
import io
import os

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [3]:
import warnings
warnings.filterwarnings('always')
warnings.filterwarnings('ignore')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
import matplotlib.pyplot as plt
import tensorflow as tf

from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
# data handling
from pyspark.sql.functions import element_at, split
from pyspark.sql.functions import pandas_udf, PandasUDFType
from pyspark.sql.functions import udf
from pyspark.sql.functions import input_file_name
from pyspark.sql.types import *
from typing import Iterator

# ml tasks
from pyspark.ml.image import ImageSchema
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
from pyspark.ml.feature import  StringIndexer
# transform
from pyspark.ml.image import ImageSchema
from pyspark.ml.linalg import Vectors, VectorUDT

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
PATH = 's3://oc-gulsump8'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://oc-gulsump8
PATH_Data:   s3://oc-gulsump8/Test
PATH_Result: s3://oc-gulsump8/Results

In [8]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://oc-gulsump8/...|2022-12-31 12:48:10|  7353|[FF D8 FF E0 00 1...|
|s3://oc-gulsump8/...|2022-12-31 12:48:13|  7350|[FF D8 FF E0 00 1...|
|s3://oc-gulsump8/...|2022-12-31 12:48:12|  7349|[FF D8 FF E0 00 1...|
|s3://oc-gulsump8/...|2022-12-31 12:48:11|  7348|[FF D8 FF E0 00 1...|
|s3://oc-gulsump8/...|2022-12-31 12:15:25|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

In [10]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+----------------------------------------------+----------+
|path                                          |label     |
+----------------------------------------------+----------+
|s3://oc-gulsump8/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://oc-gulsump8/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://oc-gulsump8/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://oc-gulsump8/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://oc-gulsump8/Test/Watermelon/r_95_100.jpg |Watermelon|
+----------------------------------------------+----------+
only showing top 5 rows

None

# Preprosessing

In [11]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [12]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [14]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            [(None, 224, 224, 3) 0                                            
__________________________________________________________________________________________________
Conv1 (Conv2D)                  (None, 112, 112, 32) 864         input_1[0][0]                    
__________________________________________________________________________________________________
bn_Conv1 (BatchNormalization)   (None, 112, 112, 32) 128         Conv1[0][0]                      
__________________________________________________________________________________________________
Conv1_relu (ReLU)               (None, 112, 112, 32) 0           bn_Conv1[0][0]                   
______________________________________________________________________________________________

In [15]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [18]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
features_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+
|                path|         label|            features|
+--------------------+--------------+--------------------+
|s3://oc-gulsump8/...|    Watermelon|[0.04279928, 0.09...|
|s3://oc-gulsump8/...|    Watermelon|[0.014875885, 0.2...|
|s3://oc-gulsump8/...|Pineapple Mini|[0.0, 4.7234764, ...|
|s3://oc-gulsump8/...|Pineapple Mini|[0.0, 4.7648377, ...|
|s3://oc-gulsump8/...|    Watermelon|[0.11024109, 0.19...|
+--------------------+--------------+--------------------+
only showing top 5 rows

### REDUCTION DE DIMENION (PCA)

In [20]:
#Construction d'une User defined Fonction qui transforme les listes de features en vecteurs denses.
array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
vectorized_df = features_df.withColumn('cnn_vectors', array_to_vector_udf('features'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
vectorized_df.show(2, True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------+--------------------+--------------------+
|                path|     label|            features|         cnn_vectors|
+--------------------+----------+--------------------+--------------------+
|s3://oc-gulsump8/...|Watermelon|[0.04279928, 0.09...|[0.04279927909374...|
|s3://oc-gulsump8/...|Watermelon|[0.014875885, 0.2...|[0.01487588509917...|
+--------------------+----------+--------------------+--------------------+
only showing top 2 rows

In [23]:
 # Fonction pour réduction de dimensions PCA
def reduc_dim_pca(features):

    # Conversion d'un tableau (array) en vecteur dense
    # Création d'un fonction utilisateur avec udf
    #vector_dense = udf(lambda x: Vectors.dense(x), VectorUDT())
    img_vd_df = features.select("path","label","cnn_vectors")
    
    # PCA
    pca_spark = PCA(inputCol="cnn_vectors", outputCol="features_pca", k=100)
    pca = pca_spark.fit(img_vd_df)
    pca_matrix = pca.transform(img_vd_df)
    
    return pca, pca_matrix

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# Fonction pour la réduction de dimension PCA
pca, pca_matrix = reduc_dim_pca(vectorized_df)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# Transformation inverse (de Vectors à Arrays)
vector_to_array_udf = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
final_df = pca_matrix.withColumn('features', vector_to_array_udf('features_pca'))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
final_df.show(10, True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+--------------------+--------------------+
|                path|         label|         cnn_vectors|        features_pca|            features|
+--------------------+--------------+--------------------+--------------------+--------------------+
|s3://oc-gulsump8/...|    Watermelon|[0.04279927909374...|[-2.0967874076741...|[-2.0967875, 5.78...|
|s3://oc-gulsump8/...|    Watermelon|[0.01487588509917...|[-1.8770516056425...|[-1.8770516, 3.28...|
|s3://oc-gulsump8/...|Pineapple Mini|[0.0,4.7234764099...|[-5.9249884518216...|[-5.9249883, 3.36...|
|s3://oc-gulsump8/...|Pineapple Mini|[0.0,4.7648377418...|[-4.8288816936532...|[-4.8288817, 2.31...|
|s3://oc-gulsump8/...|    Watermelon|[0.11024109274148...|[-2.6555744153131...|[-2.6555743, 2.17...|
|s3://oc-gulsump8/...|    Watermelon|[0.11004009842872...|[-2.8012731267059...|[-2.801273, 2.576...|
|s3://oc-gulsump8/...|Pineapple Mini|[0.0,4.7182641029...|[-6.2835548731391...|[-6.283555, 

## Enregistrement sur le cloud

#### Enregistrement des données traitées au format "**parquet**" 

In [28]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://oc-gulsump8/Results

In [29]:
# Enregistrement des données (format parquet)
final_df.write.mode('overwrite').parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Enregistrement des données traitées au format "**csv**" 

### Chargement des données enregistrées et validation du résultat

<u>On enregistrées les données  dans un **DataFrame Pandas**</u> :

In [32]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                               path  ...                                           features
0     s3://oc-gulsump8/Test/Watermelon/r_88_100.jpg  ...  [-2.0967875, 5.7894607, -6.371486, -4.2758536,...
1    s3://oc-gulsump8/Test/Watermelon/r_102_100.jpg  ...  [-1.8770516, 3.282514, -6.7290897, -2.6465309,...
2   s3://oc-gulsump8/Test/Pineapple Mini/53_100.jpg  ...  [-5.9249883, 3.3677554, 1.8359057, -4.0381913,...
3  s3://oc-gulsump8/Test/Pineapple Mini/327_100.jpg  ...  [-4.8288817, 2.313498, 1.5447862, -3.3649287, ...
4      s3://oc-gulsump8/Test/Watermelon/268_100.jpg  ...  [-2.6555743, 2.1750786, -6.776769, -4.572865, ...

[5 rows x 5 columns]

In [34]:
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(100,)

In [35]:
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(22688, 5)

In [41]:
#csv_resultsp8 = io.StringIO()
df.to_csv('s3://oc-gulsump8/Csv_resultsp8.csv', index=False)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# La mise en œuvre d’une architecture Big Data