1- Import des librairies

In [None]:
import pyspark.pandas as ps

In [148]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

# transform
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import PCA
from pyspark.sql.types import ArrayType, FloatType

In [None]:
PATH = os.getcwd()
PATH_Data = PATH+'/data/test'
PATH_Result = PATH+'/data/results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

In [150]:
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)

In [151]:
sc = spark.sparkContext

In [152]:
spark

- Lire les fichiers binaires avec spark au format jpg 

In [153]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

- Ajouter une colonne label au dataframe

In [None]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

In [155]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

In [156]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

In [157]:
new_model.summary()

Model: "model_3"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_4 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_4[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                           

* création d'un objet de diffusion (brodcast_weights) contenant les poids du modèle (new_model) afin de les partager efficacement entre les nœuds du cluster Spark

In [158]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [159]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet', # Utilise les poids pré-entraînés sur ImageNet
                        include_top=True, # Inclut les couches complètes du modèle, y compris la couche de sortie
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False   # Désactive l'entraînement des couches pour figer les poids
    
    new_model = Model(inputs=model.input,    # Utilise les mêmes entrées que le modèle d'origine
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

In [10]:
!conda list  fsspec


# packages in environment at /home/LHadjemi/mambaforge/envs/database:
#
# Name                    Version                   Build  Channel


In [160]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    # Convertit l'image en tableau numpy
    arr = img_to_array(img)
    return preprocess_input(arr)

In [161]:
def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    # Prétraite la série d'images brutes en utilisant la fonction preprocess
    input = np.stack(content_series.map(preprocess))
    
    preds = model.predict(input)
   # Pour certaines couches, les caractéristiques de sortie seront des tenseurs multidimensionnels.
    # Nous aplatirons les tenseurs de caractéristiques en vecteurs pour un stockage plus facile dans les DataFrame Spark.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

In [None]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

In [163]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [None]:
print(PATH_Result)

In [165]:
features_df.write.mode("overwrite").parquet(PATH_Result)

2023-12-29 11:11:15.173568: W tensorflow/compiler/xla/stream_executor/gpu/asm_compiler.cc:109] Couldn't get ptxas version : FAILED_PRECONDITION: Couldn't get ptxas/nvlink version string: INTERNAL: Couldn't invoke ptxas --version
2023-12-29 11:11:15.174382: W tensorflow/compiler/xla/stream_executor/gpu/redzone_allocator.cc:317] INTERNAL: Failed to launch ptxas
Relying on driver to perform ptx compilation. 
Modify $PATH to customize ptxas location.
This message will be only logged once.
                                                                                

In [166]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

In [167]:
df.head()

Unnamed: 0,path,label,features
0,file:/media/LHadjemi/Data/deployer-un-modele-d...,Avocado,"[1.2062845, 0.00094540615, 0.0, 0.0, 0.0828499..."
1,file:/media/LHadjemi/Data/deployer-un-modele-d...,Avocado,"[0.28114322, 0.0, 0.015231324, 0.0, 0.0, 0.0, ..."
2,file:/media/LHadjemi/Data/deployer-un-modele-d...,Avocado,"[0.36894366, 0.0020400512, 0.13478272, 0.03039..."
3,file:/media/LHadjemi/Data/deployer-un-modele-d...,Avocado,"[1.6595063, 0.0, 0.12816602, 0.003942761, 0.01..."
4,file:/media/LHadjemi/Data/deployer-un-modele-d...,Avocado,"[0.64518535, 0.0, 0.0017532817, 0.0, 0.0465762..."


In [168]:
df.loc[0,'features'].shape

(1280,)

#### Reducer

In [169]:
features_df.printSchema()

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features: array (nullable = true)
 |    |-- element: float (containsNull = true)



In [170]:
features_df.show(5)

[Stage 6:>                                                          (0 + 1) / 1]

+--------------------+-------+--------------------+
|                path|  label|            features|
+--------------------+-------+--------------------+
|file:/media/LHadj...|Avocado|[1.2062845, 9.454...|
|file:/media/LHadj...|Avocado|[0.28114322, 0.0,...|
|file:/media/LHadj...|Avocado|[0.36894366, 0.00...|
|file:/media/LHadj...|Avocado|[1.6595063, 0.0, ...|
|file:/media/LHadj...|Avocado|[0.64518535, 0.0,...|
+--------------------+-------+--------------------+
only showing top 5 rows



                                                                                

In [171]:
print(features_df.rdd.getNumPartitions())

20


In [172]:
# from Array to Vectors for PCA
array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

In [173]:
vectorized_df = features_df.withColumn('cnn_vectors', array_to_vector_udf('features'))

In [174]:
vectorized_df.show(5, True)

2023-12-29 11:12:09.898341: W tensorflow/compiler/xla/stream_executor/gpu/asm_compiler.cc:109] Couldn't get ptxas version : FAILED_PRECONDITION: Couldn't get ptxas/nvlink version string: INTERNAL: Couldn't invoke ptxas --version
2023-12-29 11:12:09.899391: W tensorflow/compiler/xla/stream_executor/gpu/redzone_allocator.cc:317] INTERNAL: Failed to launch ptxas
Relying on driver to perform ptx compilation. 
Modify $PATH to customize ptxas location.
This message will be only logged once.


+--------------------+-------+--------------------+--------------------+
|                path|  label|            features|         cnn_vectors|
+--------------------+-------+--------------------+--------------------+
|file:/media/LHadj...|Avocado|[1.2062845, 9.454...|[1.20628452301025...|
|file:/media/LHadj...|Avocado|[0.28114322, 0.0,...|[0.28114321827888...|
|file:/media/LHadj...|Avocado|[0.36894366, 0.00...|[0.36894366145133...|
|file:/media/LHadj...|Avocado|[1.6595063, 0.0, ...|[1.65950632095336...|
|file:/media/LHadj...|Avocado|[0.64518535, 0.0,...|[0.64518535137176...|
+--------------------+-------+--------------------+--------------------+
only showing top 5 rows



                                                                                

### pca:

In [175]:
# reduce with PCA - set k Max to determine the adequate nb of principal components
pca = PCA(k=20, inputCol='cnn_vectors', outputCol='pca_vectors')
model = pca.fit(vectorized_df)

2023-12-29 11:12:18.679689: W tensorflow/compiler/xla/stream_executor/gpu/asm_compiler.cc:109] Couldn't get ptxas version : FAILED_PRECONDITION: Couldn't get ptxas/nvlink version string: INTERNAL: Couldn't invoke ptxas --version
2023-12-29 11:12:18.680575: W tensorflow/compiler/xla/stream_executor/gpu/redzone_allocator.cc:317] INTERNAL: Failed to launch ptxas
Relying on driver to perform ptx compilation. 
Modify $PATH to customize ptxas location.
This message will be only logged once.
                                                                                

In [176]:
reduced_df = model.transform(vectorized_df)

In [177]:
reduced_df.show(5, True)

[Stage 26:>                                                         (0 + 1) / 1]

+--------------------+-------+--------------------+--------------------+--------------------+
|                path|  label|            features|         cnn_vectors|         pca_vectors|
+--------------------+-------+--------------------+--------------------+--------------------+
|file:/media/LHadj...|Avocado|[1.2062845, 9.454...|[1.20628452301025...|[-16.951059403649...|
|file:/media/LHadj...|Avocado|[0.28114322, 0.0,...|[0.28114321827888...|[-17.011035236793...|
|file:/media/LHadj...|Avocado|[0.36894366, 0.00...|[0.36894366145133...|[-15.204871244531...|
|file:/media/LHadj...|Avocado|[1.6595063, 0.0, ...|[1.65950632095336...|[-15.122530461000...|
|file:/media/LHadj...|Avocado|[0.64518535, 0.0,...|[0.64518535137176...|[-1.7547172057931...|
+--------------------+-------+--------------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [178]:
reduced_df = reduced_df.drop('features')

In [179]:
reduced_df.show(5, True)

2023-12-29 11:13:54.074154: W tensorflow/compiler/xla/stream_executor/gpu/asm_compiler.cc:109] Couldn't get ptxas version : FAILED_PRECONDITION: Couldn't get ptxas/nvlink version string: INTERNAL: Couldn't invoke ptxas --version
2023-12-29 11:13:54.075446: W tensorflow/compiler/xla/stream_executor/gpu/redzone_allocator.cc:317] INTERNAL: Failed to launch ptxas
Relying on driver to perform ptx compilation. 
Modify $PATH to customize ptxas location.
This message will be only logged once.


+--------------------+-------+--------------------+--------------------+
|                path|  label|         cnn_vectors|         pca_vectors|
+--------------------+-------+--------------------+--------------------+
|file:/media/LHadj...|Avocado|[1.20628452301025...|[-16.951059403649...|
|file:/media/LHadj...|Avocado|[0.28114321827888...|[-17.011035236793...|
|file:/media/LHadj...|Avocado|[0.36894366145133...|[-15.204871244531...|
|file:/media/LHadj...|Avocado|[1.65950632095336...|[-15.122530461000...|
|file:/media/LHadj...|Avocado|[0.64518535137176...|[-1.7547172057931...|
+--------------------+-------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [180]:
# from Array to Vectors for PCA
vector_to_array_udf = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))

In [181]:
data_final= reduced_df.withColumn('features_array', vector_to_array_udf('pca_vectors'))

In [182]:
data_final.show(5, True)

2023-12-29 11:14:05.541644: W tensorflow/compiler/xla/stream_executor/gpu/asm_compiler.cc:109] Couldn't get ptxas version : FAILED_PRECONDITION: Couldn't get ptxas/nvlink version string: INTERNAL: Couldn't invoke ptxas --version
2023-12-29 11:14:05.542625: W tensorflow/compiler/xla/stream_executor/gpu/redzone_allocator.cc:317] INTERNAL: Failed to launch ptxas
Relying on driver to perform ptx compilation. 
Modify $PATH to customize ptxas location.
This message will be only logged once.


+--------------------+-------+--------------------+--------------------+--------------------+
|                path|  label|         cnn_vectors|         pca_vectors|      features_array|
+--------------------+-------+--------------------+--------------------+--------------------+
|file:/media/LHadj...|Avocado|[1.20628452301025...|[-16.951059403649...|[-16.95106, -4.37...|
|file:/media/LHadj...|Avocado|[0.28114321827888...|[-17.011035236793...|[-17.011036, -13....|
|file:/media/LHadj...|Avocado|[0.36894366145133...|[-15.204871244531...|[-15.204871, 0.99...|
|file:/media/LHadj...|Avocado|[1.65950632095336...|[-15.122530461000...|[-15.122531, 2.56...|
|file:/media/LHadj...|Avocado|[0.64518535137176...|[-1.7547172057931...|[-1.7547172, -2.3...|
+--------------------+-------+--------------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

#### Storage:

In [183]:
# write local results on parquet file
data_final.write.mode('overwrite').parquet("./output/features")

2023-12-29 11:14:14.269869: W tensorflow/compiler/xla/stream_executor/gpu/asm_compiler.cc:109] Couldn't get ptxas version : FAILED_PRECONDITION: Couldn't get ptxas/nvlink version string: INTERNAL: Couldn't invoke ptxas --version
2023-12-29 11:14:14.273434: W tensorflow/compiler/xla/stream_executor/gpu/redzone_allocator.cc:317] INTERNAL: Failed to launch ptxas
Relying on driver to perform ptx compilation. 
Modify $PATH to customize ptxas location.
This message will be only logged once.
                                                                                

In [184]:
df_final = pd.read_parquet('./output/features', engine='pyarrow')

In [185]:
df_final.info(verbose=False, memory_usage="deep")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 143 entries, 0 to 142
Columns: 5 entries, path to features_array
dtypes: object(5)
memory usage: 113.1 KB


In [186]:
df_final.head(5)

Unnamed: 0,path,label,cnn_vectors,pca_vectors,features_array
0,file:/media/LHadjemi/Data/deployer-un-modele-d...,Avocado,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","[-16.95106, -4.3774457, -11.170761, -10.679921..."
1,file:/media/LHadjemi/Data/deployer-un-modele-d...,Avocado,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","[-17.011036, -13.817756, -10.5200405, 2.280178..."
2,file:/media/LHadjemi/Data/deployer-un-modele-d...,Avocado,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","[-15.204871, 0.99675465, -8.867973, -0.4300575..."
3,file:/media/LHadjemi/Data/deployer-un-modele-d...,Avocado,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","[-15.122531, 2.5644705, -9.907209, -1.5638206,..."
4,file:/media/LHadjemi/Data/deployer-un-modele-d...,Avocado,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va...","[-1.7547172, -2.3506665, -12.355202, -4.484647..."


In [187]:
spark.stop()