### Projet 9 : Réaliser un traitement dans un environment Big Data sur le Cloud

Notre start-up "Fruits!" est une entreprise dans le domaine de l'AgriTech. Nous développons des solutions qui préservent la biodiversité des fruits, en permettant des traitements spécifiques adaptés à chaque espèce, grâce à des robots cueilleurs intelligents.   

Nous avons décidé de lancer une application mobile qui permettra aux utilisateurs d’identifier des fruits simplement en les photographiant. 

Sommaire :  
- Démarrage de la session Spark
- Installation des packages
- Import des librairies
- Définition des PATH pour charger les images et enregistrer les résultats
- Traitement des données
- Chargement des données
- Préparation du modèle
- Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF
- Exécutions des actions d'extractions de features
- Chargement des données enregistrées et validation du résultat
- PCA sur 20% des images
- Conclusion

### Démarrage de la session Spark

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,application_1728353915014_0004,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3,application_1728353915014_0004,pyspark,idle,Link,Link,,✔


### Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

### Import des librairies

In [3]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [4]:
PATH = 's3://bucketoc'
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://bucketoc
PATH_Data:   s3://bucketoc/Test
PATH_Result: s3://bucketoc/Results

### Traitement des données

#### Chargement des données

In [5]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://bucketoc/Tes...|2024-10-07 22:02:40|  7353|[FF D8 FF E0 00 1...|
|s3://bucketoc/Tes...|2024-10-07 22:02:42|  7350|[FF D8 FF E0 00 1...|
|s3://bucketoc/Tes...|2024-10-07 22:02:41|  7349|[FF D8 FF E0 00 1...|
|s3://bucketoc/Tes...|2024-10-07 22:02:41|  7348|[FF D8 FF E0 00 1...|
|s3://bucketoc/Tes...|2024-10-07 22:01:43|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [7]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+-------------------------------------------+----------+
|path                                       |label     |
+-------------------------------------------+----------+
|s3://bucketoc/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://bucketoc/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://bucketoc/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://bucketoc/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://bucketoc/Test/Watermelon/r_95_100.jpg |Watermelon|
+-------------------------------------------+----------+
only showing top 5 rows

None

#### Préparation du modèle

In [8]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

In [12]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [13]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



#### Exécutions des actions d'extractions de features

In [14]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
features_df = images.repartition(30).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
print(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://bucketoc/Results

In [17]:
features_df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### Chargement des données enregistrées et validation du résultat

In [18]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                            path  ...                                           features
0     s3://bucketoc/Test/Watermelon/r_87_100.jpg  ...  [0.029508123, 0.06816347, 0.0, 0.016227119, 0....
1   s3://bucketoc/Test/Pineapple Mini/23_100.jpg  ...  [0.0, 4.9297256, 0.0, 0.0, 0.0, 0.0, 0.1340701...
2   s3://bucketoc/Test/Pineapple Mini/30_100.jpg  ...  [0.0, 4.538602, 0.031729687, 0.0, 0.0, 0.0, 0....
3   s3://bucketoc/Test/Pineapple Mini/53_100.jpg  ...  [0.0, 4.7234783, 0.0, 0.0, 0.0, 0.0, 0.1619792...
4  s3://bucketoc/Test/Pineapple Mini/275_100.jpg  ...  [0.0, 4.8610783, 0.0, 0.0, 0.0, 0.0, 0.3299897...

[5 rows x 3 columns]

In [20]:
# df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [22]:
len(df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

13550

In [23]:
len(df.loc[0,'features'])

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

1280

In [24]:
df = df.sample(frac=0.25, random_state=42)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                  path  ...                                           features
811         s3://bucketoc/Test/Tamarillo/r_325_100.jpg  ...  [0.171594, 0.087930806, 0.0, 0.0, 0.041154783,...
13473        s3://bucketoc/Test/Kumquats/r_281_100.jpg  ...  [0.321412, 0.0, 0.0, 0.0, 0.005705162, 0.0, 0....
12502      s3://bucketoc/Test/Potato Sweet/151_100.jpg  ...  [0.6665364, 0.012092337, 0.018678218, 0.0, 0.1...
8457             s3://bucketoc/Test/Peach 2/93_100.jpg  ...  [0.7146269, 0.21543047, 0.0, 0.0, 0.46155572, ...
6415     s3://bucketoc/Test/Potato Sweet/r2_53_100.jpg  ...  [0.11092062, 0.0, 0.0, 0.0, 0.0, 0.0, 0.200810...
...                                                ...  ...                                                ...
1584              s3://bucketoc/Test/Papaya/66_100.jpg  ...  [2.0129778, 0.108574174, 0.0, 0.0, 0.16584532,...
3592          s3://bucketoc/Test/Raspberry/217_100.jpg  ...  [0.40116325, 0.25088403, 0.051270105, 0.0, 0.2...
7

### PCA sur 20% des images

In [26]:
'''
DETERMINER LE NBR DE PC NECESSAIRE POUR EXPLIQUER 80% DE LA VARIANCE
Le code fourni effectue une analyse en composantes principales (PCA) sur un DataFrame 
PySpark dérivé d'un DataFrame Pandas. Il détermine le nombre optimal de composantes 
principales à conserver pour expliquer une variance cumulée suffisante des données.
'''
from pyspark.sql import SparkSession
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import PCA
from pyspark.sql.functions import udf

spark.sparkContext.setLogLevel("ERROR")

# Convert Pandas DataFrame to PySpark DataFrame
df['features'] = df['features'].apply(lambda x: Vectors.dense(x))
df_spark = spark.createDataFrame(df)

# Create a VectorAssembler to combine the "features" column into a single "features_vec" column
vecAssembler = VectorAssembler(inputCols=["features"], outputCol="features_vec")

# Apply the VectorAssembler to transform the PySpark DataFrame
df_spark = vecAssembler.transform(df_spark)

variances = []  # Initialize an empty list to store explained variances
total_features = len(df_spark.select("features").first()[0])  # Get the total number of features
step_size = int(total_features * 0.01)  # Set the step size for iteration (1% of total features)
resultats = []  # Initialize an empty list to store results


for k in range(1, total_features + 1, step_size):
    # Create a PCA instance with 'k' (number of components) set by the current iteration
    pca = PCA(k=k, inputCol="features_vec", outputCol="pcaFeatures")
    
    # Fit the PCA model to the transformed DataFrame
    model = pca.fit(df_spark)
    
    # Calculate the cumulative explained variance by the principal components
    explained_variance = sum(model.explainedVariance)
    variances.append(explained_variance)

    # Append the number of features and explained variance to the results list
    resultats.append((k, explained_variance))

    # If the explained variance reaches 80%, stop the loop   
    if explained_variance >= 0.8:
        break

# Append a final result indicating all features explain 1.0 variance
resultats.append(((total_features, 1.0)))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [27]:
resultats

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

[(1, 0.11139354520246436), (13, 0.5251955468092676), (25, 0.6509245769165616), (37, 0.7205916179093765), (49, 0.7660057042190238), (61, 0.7981832309870139), (73, 0.822732460266707), (1280, 1.0)]

In [28]:
import numpy as np
import math

# Extract data from results list
x_data = [i[0] for i in resultats]
y_data = [i[1]*100 for i in resultats]

x_80 = np.interp(80, y_data, np.round(x_data))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
Nb_composants_80 = math.ceil(x_80)
Nb_composants_80

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

62

In [30]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import VectorAssembler

spark.sparkContext.setLogLevel("ERROR")

# Convert Pandas DataFrame to PySpark DataFrame
df['features'] = df['features'].apply(lambda x: Vectors.dense(x))
df_spark = spark.createDataFrame(df)

# Create a VectorAssembler to combine the "features" column into a single "features_vec" column
vecAssembler = VectorAssembler(inputCols=["features"], outputCol="features_vec")

# Apply the VectorAssembler to transform the PySpark DataFrame
df_spark = vecAssembler.transform(df_spark)

# Create a PCA object with 'Nb_composants_80' (number of components)
pca = PCA(k=Nb_composants_80, inputCol="features_vec", outputCol="pcaFeatures")

# Fit the PCA model to the data
model = pca.fit(df_spark)

# Transform the data using the PCA model
result = model.transform(df_spark)

# Drop the 'features_vec' column
result = result.drop('features_vec')

# Display the transformed data
result.show()


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------------+--------------------+--------------------+
|                path|            label|            features|         pcaFeatures|
+--------------------+-----------------+--------------------+--------------------+
|s3://bucketoc/Tes...|        Tamarillo|[0.17159399390220...|[14.4558652790233...|
|s3://bucketoc/Tes...|         Kumquats|[0.32141199707984...|[-0.1457465248047...|
|s3://bucketoc/Tes...|     Potato Sweet|[0.66653639078140...|[-4.1908676983914...|
|s3://bucketoc/Tes...|          Peach 2|[0.71462690830230...|[3.52180296196950...|
|s3://bucketoc/Tes...|     Potato Sweet|[0.11092062294483...|[-0.6717270222704...|
|s3://bucketoc/Tes...|           Papaya|[1.55914247035980...|[-1.4745985046023...|
|s3://bucketoc/Tes...|         Tomato 2|[0.68183600902557...|[10.4618128708815...|
|s3://bucketoc/Tes...|         Rambutan|[0.0,2.2619524002...|[-2.7053442754509...|
|s3://bucketoc/Tes...|         Tomato 2|[0.97659122943878...|[11.3783808122340...|
|s3:

In [31]:
PATH = 's3://bucketoc'
PATH_Data = PATH+'/Test'
PATH_Result_PCA = PATH+'/Results_PCA'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Results_PCA: '+PATH_Result_PCA)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://bucketoc
PATH_Data:   s3://bucketoc/Test
PATH_Results_PCA: s3://bucketoc/Results_PCA

In [32]:
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT, DenseVector
from pyspark.sql.types import ArrayType, FloatType

# UDF pour convertir Vector en list
vector_to_list = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))

# Appliquer la conversion
result = result.withColumn("features", vector_to_list(result["features"]))
result = result.withColumn("pcaFeatures", vector_to_list(result["pcaFeatures"]))

# Écrire dans Parquet
result.write.mode('overwrite').parquet(PATH_Result_PCA)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [33]:
df = pd.read_parquet(PATH_Result_PCA, engine='pyarrow')
df

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                 path  ...                                        pcaFeatures
0          s3://bucketoc/Test/Tamarillo/r_325_100.jpg  ...  [14.455865, -0.090794764, -5.791322, -7.412672...
1           s3://bucketoc/Test/Kumquats/r_281_100.jpg  ...  [-0.14574653, -4.739161, -6.6349807, -6.376461...
2         s3://bucketoc/Test/Potato Sweet/151_100.jpg  ...  [-4.190868, 7.7748513, -2.2409966, -10.781103,...
3               s3://bucketoc/Test/Peach 2/93_100.jpg  ...  [3.521803, -4.5442133, -4.5502286, -1.5462464,...
4       s3://bucketoc/Test/Potato Sweet/r2_53_100.jpg  ...  [-0.671727, 1.085584, -8.061124, -9.212188, 8....
...                                               ...  ...                                                ...
3383             s3://bucketoc/Test/Papaya/66_100.jpg  ...  [-2.585064, 1.0636737, -8.012012, -8.333485, 2...
3384         s3://bucketoc/Test/Raspberry/217_100.jpg  ...  [-2.6232076, 3.3355765, 4.1145906, -9.222563, ...
3385      

#### Conclusion

Grâce à l’intégration des différentes briques de traitement, nous avons non seulement établi une base solide pour notre application mobile, mais aussi préparé le terrain pour l’évolutivité future de notre architecture Big Data. La combinaison de PySpark avec AWS EMR nous permet de traiter efficacement les données, tout en assurant la conformité avec les réglementations en vigueur.   
