# Projet 9 - Réalisez un traitement dans un environnement Big Data sur le Cloud

## Import des librairies

In [1]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import PCA
from pyspark.sql import functions as F

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1710164625461_0002,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1,application_1710164625461_0002,pyspark,idle,Link,Link,,✔


## Chargement des images et enregistrement des résultats

In [3]:
PATH = 's3://p8marionflore'
PATH_Data = PATH+'/Test'
PATH_Results = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Results: '+PATH_Results)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://p8marionflore
PATH_Data:   s3://p8marionflore/Test
PATH_Results: s3://p8marionflore/Results

### Chargement des données

In [4]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p8marionflor...|2024-03-08 20:02:01|  7353|[FF D8 FF E0 00 1...|
|s3://p8marionflor...|2024-03-08 20:00:29|  7350|[FF D8 FF E0 00 1...|
|s3://p8marionflor...|2024-03-08 19:59:56|  7349|[FF D8 FF E0 00 1...|
|s3://p8marionflor...|2024-03-08 20:02:49|  7348|[FF D8 FF E0 00 1...|
|s3://p8marionflor...|2024-03-08 20:04:30|  7328|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

### Ajout du label des images et sélection de colonnes

In [7]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+------------------------------------------------+----------+
|path                                            |label     |
+------------------------------------------------+----------+
|s3://p8marionflore/Test/Watermelon/r_106_100.jpg|Watermelon|
|s3://p8marionflore/Test/Watermelon/r_109_100.jpg|Watermelon|
|s3://p8marionflore/Test/Watermelon/r_108_100.jpg|Watermelon|
|s3://p8marionflore/Test/Watermelon/r_107_100.jpg|Watermelon|
|s3://p8marionflore/Test/Watermelon/r_95_100.jpg |Watermelon|
+------------------------------------------------+----------+
only showing top 5 rows

None

## Préparation du modèle

In [8]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [9]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 112, 112, 32)         864       ['input_1[0][0]']             
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 112, 112, 32)         128       ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 112, 112, 32)         0         ['bn_Conv1[0][0]']        

In [12]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Définition du processus de chargement des images et application de leur featurisation à travers l'utilisation de pandas UDF

In [13]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



## Exécution des actions d'extraction des features

In [14]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [15]:
print(PATH_Results)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

s3://p8marionflore/Results

In [17]:
features_df.write.mode("overwrite").parquet(PATH_Results)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Chargement des données enregistrées et validation du résultat

In [19]:
# Ouverture du fichier au format parquet
df = pd.read_parquet(PATH_Results, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
# Aperçu
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...                                           features
0   s3://p8marionflore/Test/Watermelon/r_104_100.jpg  ...  [0.18860447, 0.35067347, 0.066528104, 0.0, 0.6...
1    s3://p8marionflore/Test/Watermelon/r_93_100.jpg  ...  [0.09511647, 0.057544388, 0.0, 0.08459367, 1.0...
2    s3://p8marionflore/Test/Watermelon/r_69_100.jpg  ...  [1.164646, 0.31732613, 0.0, 0.0, 0.6920596, 0....
3  s3://p8marionflore/Test/Pineapple Mini/272_100...  ...  [0.0, 4.8245773, 0.0, 0.0, 0.0, 0.0, 0.0933936...
4     s3://p8marionflore/Test/Watermelon/254_100.jpg  ...  [0.010156719, 0.1815322, 0.0, 0.0, 1.7499609, ...

[5 rows x 3 columns]

In [21]:
# Format de la colonne features (array)
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [22]:
# Format de df
df.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(22433, 3)

# Réduction de dimension

In [23]:
# Vectorisation de la colonne features
to_vector = F.udf(lambda x: Vectors.dense(x), VectorUDT())
sparkDF = features_df.select('path', 'label','features', to_vector("features").alias("features_vec"))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
# Aperçu du fichier après transformation
sparkDF.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+--------------------+
|                path|         label|            features|        features_vec|
+--------------------+--------------+--------------------+--------------------+
|s3://p8marionflor...|Pineapple Mini|[0.0020795355, 4....|[0.00207953550852...|
|s3://p8marionflor...|    Watermelon|[0.7753587, 0.245...|[0.77535867691040...|
|s3://p8marionflor...|    Watermelon|[0.64753145, 0.34...|[0.64753144979476...|
|s3://p8marionflor...|    Watermelon|[0.05196944, 0.11...|[0.05196943879127...|
|s3://p8marionflor...|     Raspberry|[0.010020801, 0.3...|[0.01002080086618...|
+--------------------+--------------+--------------------+--------------------+
only showing top 5 rows

In [25]:
# Utilisation du PCA (k=2)
pcaSparkEstimator = PCA(inputCol="features_vec", outputCol="pca_Features", k=2)
pca = pcaSparkEstimator.fit(sparkDF)
pca_matrix=pca.transform(sparkDF)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
# Aperçu du fichier après réduction de dimension
pca_matrix.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+--------------------+--------------------+
|                path|         label|            features|        features_vec|        pca_Features|
+--------------------+--------------+--------------------+--------------------+--------------------+
|s3://p8marionflor...|    Watermelon|[0.18860447, 0.35...|[0.18860447406768...|[-1.9308853812663...|
|s3://p8marionflor...|    Watermelon|[0.09511647, 0.05...|[0.09511646628379...|[-2.4230607062742...|
|s3://p8marionflor...|Pineapple Mini|[0.0020795355, 4....|[0.00207953550852...|[-6.7759678423393...|
|s3://p8marionflor...|Pineapple Mini|[0.0, 4.8245773, ...|[0.0,4.8245773315...|[-4.1104438005963...|
|s3://p8marionflor...|    Watermelon|[0.010156719, 0.1...|[0.01015671901404...|[-2.1728261646303...|
+--------------------+--------------+--------------------+--------------------+--------------------+
only showing top 5 rows

In [27]:
# Sélection des colonnes retenues
pca_matrix_final = pca_matrix.select('path','label','pca_Features')

# Aperçu des 5 première lignes
pca_matrix_final.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------+--------------------+
|                path|         label|        pca_Features|
+--------------------+--------------+--------------------+
|s3://p8marionflor...|    Watermelon|[-1.9308853812663...|
|s3://p8marionflor...|    Watermelon|[-2.4230607062742...|
|s3://p8marionflor...|Pineapple Mini|[-6.7759678423393...|
|s3://p8marionflor...|    Watermelon|[-4.4761678492822...|
|s3://p8marionflor...|     Raspberry|[0.38324143415740...|
+--------------------+--------------+--------------------+
only showing top 5 rows

# Export au format csv

In [29]:
pca_matrix_final.toPandas().to_csv('s3://p8marionflore/Final_results.csv')