# 1 : Ouverture Pyspark et Librairies

In [1]:
# L'exécution de cette cellule démarre l'application Spark

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1697544089689_0001,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
%%info

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
0,application_1697544089689_0001,pyspark,idle,Link,Link,,✔


In [3]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import PCA
from pyspark.ml.functions import vector_to_array

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 2 : Création des PATHs

In [4]:
PATH = 's3://ocrds-projet8'
PATH_Data = PATH+'/TestLocal'
PATH_Result = PATH+'/Resultssimples'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:        s3://ocrds-projet8
PATH_Data:   s3://ocrds-projet8/TestLocal
PATH_Result: s3://ocrds-projet8/Resultssimples

# 3 : Chargement des données

In [5]:
# Chargement des données avec la bonne extention et dans tous les sous dossiers :
images = spark.read.format("binaryFile").option("pathGlobFilter", "*.jpg").option("recursiveFileLookup", "true").load(PATH_Data)
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))

# impression des résultats :
print(images.select('path','label').show(5, False))

# Longueur de la DF :
images.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------------------------------------------------+--------------+
|path                                                  |label         |
+------------------------------------------------------+--------------+
|s3://ocrds-projet8/TestLocal/Apple Golden 1/84_100.jpg|Apple Golden 1|
|s3://ocrds-projet8/TestLocal/Apple Golden 1/86_100.jpg|Apple Golden 1|
|s3://ocrds-projet8/TestLocal/Apple Golden 1/85_100.jpg|Apple Golden 1|
|s3://ocrds-projet8/TestLocal/Apple Golden 1/82_100.jpg|Apple Golden 1|
|s3://ocrds-projet8/TestLocal/Apple Golden 1/81_100.jpg|Apple Golden 1|
+------------------------------------------------------+--------------+
only showing top 5 rows

None
128

# 4 : Création des définitions (modèles ML)

In [6]:
# Ouverture du model MobileNetV2 dans la bonne dimension :
model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3))

# Suppression de la derniére couche du model (ici -2)
new_model = Model(inputs=model.input, outputs=model.layers[-2].output)

brodcast_weights = sc.broadcast(new_model.get_weights())

# Définition générale (ouverture du modèle et brodcast des weights ) :
def model_fn():
    model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3))
    new_model = Model(inputs=model.input, outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5

In [7]:
# Definition de redimensionnement des images en 224/224 et non 100/100 :
def preprocess(content):
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

# Récupération des résultats des photos passées dans le model aprés être passée par le preprocess, puis flatten des résultats :
def featurize_series(model, content_series):
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)


@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



# 5 : Extraction des features

In [8]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# Récupération des features via les def précédentes :
features_df = images.repartition(20).select(col("path"), col("label"), featurize_udf("content").alias("features"))

# Montre le tableau de résultats avec les features :
print(features_df.show(6))

# Longueur de la DF :
features_df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+--------------------+
|                path|             label|            features|
+--------------------+------------------+--------------------+
|s3://ocrds-projet...|       Apple Red 3|[0.75022817, 0.06...|
|s3://ocrds-projet...|Apple Red Yellow 1|[0.34025666, 0.58...|
|s3://ocrds-projet...|       Apple Red 1|[0.25895342, 0.0,...|
|s3://ocrds-projet...|       Apple Red 2|[0.8241742, 0.308...|
|s3://ocrds-projet...|Apple Red Yellow 2|[1.0047052, 0.0, ...|
|s3://ocrds-projet...|Apple Red Yellow 2|[1.4089472, 0.0, ...|
+--------------------+------------------+--------------------+
only showing top 6 rows

None
128

# 6 : Application du PCA

In [10]:
# Définition globale de cette partie là :
def PCASpark(df) :
    # Vectorisation des features pour application du PCA :
    to_vector = udf(lambda a: Vectors.dense(a), VectorUDT())
    features_df_vector = df.withColumn("features", to_vector("features"))

    # PCA des données :
    pca = PCA(k = 100, inputCol="features", outputCol="pca_features")
    model = pca.fit(features_df_vector)
    pca_feat_df = model.transform(features_df_vector)
    features_df_PCA = pca_feat_df.select("path", "label", "pca_features")

    # Dévectorisation des données :
    dffinal = features_df_PCA.withColumn('pca_features', vector_to_array('pca_features'))
    return dffinal

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# Utilisation de la définition :
df = PCASpark(features_df)
df.show(6)

# Longueur de la DF :
df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+--------------------+
|                path|             label|        pca_features|
+--------------------+------------------+--------------------+
|s3://ocrds-projet...|       Apple Red 3|[-13.374563421283...|
|s3://ocrds-projet...|Apple Red Yellow 1|[-12.176831876945...|
|s3://ocrds-projet...|       Apple Red 1|[-12.031045397776...|
|s3://ocrds-projet...|       Apple Red 2|[-9.1496780547485...|
|s3://ocrds-projet...|Apple Red Yellow 2|[1.32605706003507...|
|s3://ocrds-projet...|Apple Red Yellow 2|[1.73857882420740...|
+--------------------+------------------+--------------------+
only showing top 6 rows

128

# 7 : Enregistrement sous format Parquet

In [12]:
# Enregristrement des données :
df.write.mode("overwrite").parquet(PATH_Result)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# 8 : Ouverture des données simples

In [13]:
# Chargement des données depuis mon cloud :
df = pd.read_parquet(PATH_Result, engine='pyarrow')

# Nb de données dans la colonne PCA_Features :
df.loc[0,'pca_features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(100,)

In [14]:
# Split de la colonnes features PCA pour avoir une donnée par colonne :
Data = pd.DataFrame()
for index in range(len(df)):
  OneRow = pd.DataFrame(df['pca_features'][index]).T
  Data = pd.concat([Data, OneRow])
Data = Data.reset_index()

del df['pca_features']
df = pd.merge(df, Data, right_index = True, left_index = True)
print(df.shape)
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(128, 103)
                                                path  ...        99
0  s3://ocrds-projet8/TestLocal/Apple Red 3/53_10...  ... -0.641341
1  s3://ocrds-projet8/TestLocal/Apple Red Yellow ...  ...  0.073453
2  s3://ocrds-projet8/TestLocal/Apple Red 1/34_10...  ... -0.389130
3  s3://ocrds-projet8/TestLocal/Apple Red 2/38_10...  ...  0.266178
4  s3://ocrds-projet8/TestLocal/Apple Red Yellow ...  ... -0.069371

[5 rows x 103 columns]

In [21]:
# Enregistrement d'un fichier CSV dans S3 :
PATH_CSV = 's3://ocrds-projet8/CSV'
df.to_csv(PATH_CSV + '/' + 'P8.csv', index=False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# Vérification du CSV :
df = pd.read_csv(PATH_CSV + '/' + 'P8.csv')
df.head()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                                path  ...        99
0  s3://ocrds-projet8/TestLocal/Apple Red 3/53_10...  ... -0.641341
1  s3://ocrds-projet8/TestLocal/Apple Red Yellow ...  ...  0.073453
2  s3://ocrds-projet8/TestLocal/Apple Red 1/34_10...  ... -0.389130
3  s3://ocrds-projet8/TestLocal/Apple Red 2/38_10...  ...  0.266178
4  s3://ocrds-projet8/TestLocal/Apple Red Yellow ...  ... -0.069371

[5 rows x 103 columns]