# 📚 <span style='color:#0D2BA5'>**Réalisez un traitement dans un environnement Big Data sur le Cloud**</span>

- <font color='RoyalBlue'>**Contexte :**
*****
**La start-up "Fruits!", spécialisée dans l'AgriTech, souhaite développer une application mobile innovante. Cette application permettra aux utilisateurs de prendre des photos de fruits et d'obtenir instantanément des informations sur ces derniers. Le projet utilise des images de fruits et les étiquettes correspondantes pour créer un modèle de reconnaissance d'image. Un alternant a déjà posé les premières bases de ce projet dans un notebook nommé P8_Notebook_Linux_EMR_PySpark_V1.0.ipynb, sur un environnement cloud AWS EMR.**

- <font color='RoyalBlue'>**Mission :**
*****
**Notre mission est de reprendre les travaux réalisés par l'alternant, enrichir la chaîne de traitement existante, et l'optimiser pour les futures évolutions. Il s'agira de construire une architecture **Big Data** scalable et de **gérer efficacement les données massives tout en garantissant une intégration fluide avec l'application mobile.**

- <font color='RoyalBlue'>**Problématique :**
*****
**Avec l'augmentation rapide des volumes de données, notamment les images de fruits, comment concevoir une chaîne de traitement robuste sur un environnement cloud afin de garantir une classification rapide et précise, tout en assurant une évolutivité pour les futures fonctionnalités de l'application ?**

- <font color='RoyalBlue'>**Objectif :**
*****
**Concevoir une chaîne de traitement scalable sur AWS EMR pour gérer les données massives de fruits.
Développer une application mobile efficace qui reconnaît les fruits à partir de simples photos, en se basant sur des modèles d'apprentissage automatique utilisant des images de fruits et leurs étiquettes associées.**

# <a name="C1"><font color='RoyalBlue'> 1. Import des librairies</a>

In [None]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import PCA
from pyspark.sql import functions as F

# <font color='RoyalBlue'>2. Chargement des images et enregistrement des résultats

In [3]:
# Chemins d'accès à Amazon S3
PATH = 's3://s3-fruits-1/Rep1/'  
PATH_Data = PATH + '/Test'
PATH_Results = PATH + '/Results'

# Afficher les chemins
print('PATH:        ' + PATH)
print('PATH_Data:   ' + PATH_Data)
print('PATH_Results: ' + PATH_Results)

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
42,application_1728309150675_0043,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://s3-fruits-1/Rep1/
PATH_Data:   s3://s3-fruits-1/Rep1//Test
PATH_Results: s3://s3-fruits-1/Rep1//Results

## <font color='DodgerBlue'> 2.1 Chargement des données

In [4]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://s3-fruits-1/...|2024-10-07 12:04:15|125135|[FF D8 FF E0 00 1...|
|s3://s3-fruits-1/...|2024-10-07 12:04:16|124785|[FF D8 FF E0 00 1...|
|s3://s3-fruits-1/...|2024-10-07 12:04:15|123514|[FF D8 FF E0 00 1...|
|s3://s3-fruits-1/...|2024-10-07 12:04:18|122958|[FF D8 FF E0 00 1...|
|s3://s3-fruits-1/...|2024-10-07 12:04:15|122807|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

## <font color='DodgerBlue'> 2.2  Ajout du label des images et sélection de colonnes

In [6]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+-------------------------------------------------+-----------+
|path                                             |label      |
+-------------------------------------------------+-----------+
|s3://s3-fruits-1/Rep1/Test/apple_hit_1/r0_115.jpg|apple_hit_1|
|s3://s3-fruits-1/Rep1/Test/apple_hit_1/r0_119.jpg|apple_hit_1|
|s3://s3-fruits-1/Rep1/Test/apple_hit_1/r0_107.jpg|apple_hit_1|
|s3://s3-fruits-1/Rep1/Test/apple_hit_1/r0_143.jpg|apple_hit_1|
|s3://s3-fruits-1/Rep1/Test/apple_hit_1/r0_111.jpg|apple_hit_1|
+-------------------------------------------------+-----------+
only showing top 5 rows

None

# <font color='RoyalBlue'> 3. Préparation du modèle

In [8]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [9]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 112, 112, 32)         864       ['input_1[0][0]']             
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 112, 112, 32)         128       ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 112, 112, 32)         0         ['bn_Conv1[0][0]']        

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

## <font color='DodgerBlue'> 3.1 Processus de chargement des images et application de leur featurisation

In [13]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



## <font color='DodgerBlue'> 3.2 Exécution des actions d'extraction des features

In [14]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
features_df.write.mode("overwrite").parquet(PATH_Results)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## <font color='DodgerBlue'> 3.3 Chargement des données enregistrées et validation du résultat

In [19]:
# Ouverture du fichier au format parquet
df = pd.read_parquet(PATH_Results, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# Format de la colonne features (array)
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [22]:
# Format de df
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(22433, 3)

# <font color='RoyalBlue'> 4. Réduction de dimension

In [23]:
# Vectorisation de la colonne features
to_vector = F.udf(lambda x: Vectors.dense(x), VectorUDT())
sparkDF = features_df.select('path', 'label','features', to_vector("features").alias("features_vec"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
# Utilisation du PCA (k=2)
pcaSparkEstimator = PCA(inputCol="features_vec", outputCol="pca_Features", k=2)
pca = pcaSparkEstimator.fit(sparkDF)
pca_matrix=pca.transform(sparkDF)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
# Sélection des colonnes retenues
pca_matrix_final = pca_matrix.select('path','label','pca_Features')

# <font color='RoyalBlue'>​ 5. Export au format csv

In [29]:
pca_matrix_final.toPandas().to_csv('s3://s3-fruits-1/Rep1/Results/')