In [1]:
# librairies classiques
import io
import time
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from PIL import Image

# librairies pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import ArrayType, IntegerType, FloatType, StringType
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.ml.feature import StandardScaler,PCA

#librairies tensorflow
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model

import os

2023-04-13 16:17:27.002257: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-04-13 16:17:27.713341: I tensorflow/tsl/cuda/cudart_stub.cc:28] Could not find cuda drivers on your machine, GPU will not be used.
2023-04-13 16:17:27.716536: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


# Définition des PATH pour charger les images et enregistrer les résultats

In [2]:
PATH = os.getcwd()
PATH_Data = PATH+'/data/fruits-360_dataset/fruits-360/Test'
PATH_Result = PATH+'/data/fruits-360_dataset/fruits-360/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        /home/tomatoketchoup/deployer_model_cloud/notebook
PATH_Data:   /home/tomatoketchoup/deployer_model_cloud/notebook/data/fruits-360_dataset/fruits-360/Test
PATH_Result: /home/tomatoketchoup/deployer_model_cloud/notebook/data/fruits-360_dataset/fruits-360/Results


# Création de la SparkSession

In [3]:
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)

23/04/13 16:17:40 WARN Utils: Your hostname, TECLAST resolves to a loopback address: 127.0.1.1; using 172.18.238.92 instead (on interface eth0)
23/04/13 16:17:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/13 16:17:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


This code creates a SparkSession object named spark.

The SparkSession is created using the builder() method, which returns a SparkSession.Builder object. This object allows you to specify various configuration options for the SparkSession.

The appName() method is used to specify the name of the Spark application, which is set to 'P8' in this case.

The master() method is used to specify the URL of the cluster manager to connect to. In this case, it is set to 'local', which means the Spark application will run on the local machine using all available cores.

The config() method is used to set a configuration parameter for the SparkSession. In this case, it is used to set the spark.sql.parquet.writeLegacyFormat parameter to 'true'. This parameter controls whether Parquet files are written using the legacy format or not.

Finally, the getOrCreate() method is called to create a new SparkSession or retrieve an existing one if it already exists. The SparkSession is then assigned to the variable spark.

In [4]:
sc = spark.sparkContext

This line of code creates a SparkContext object called sc using the SparkSession object called spark.

SparkContext is the entry point to any Spark functionality, and it allows you to communicate with a Spark cluster. The SparkSession object is a unified entry point for interacting with structured data in Spark and includes support for both batch and streaming data processing.

Once you have a SparkContext object, you can use it to create RDDs (Resilient Distributed Datasets) and perform various transformations and actions on them.

In [5]:
spark

# Traitement des données

In [6]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

                                                                                

This code reads in binary image files in the JPEG format from a specified directory using Apache Spark, a distributed computing framework.

Here is a breakdown of the code:

spark is an instance of a SparkSession that provides a programming interface to interact with Spark.
read is a method of SparkSession used to read data from external storage systems.
format("binaryFile") specifies the format of the data to be read as binary files.
.option("pathGlobFilter", "*.jpg") filters the files to be read to only those with a ".jpg" file extension.
.option("recursiveFileLookup", "true") specifies that the files should be searched recursively within subdirectories of the specified path.
.load(PATH_Data) loads the binary image files from the specified path, and returns a DataFrame object that can be further processed and analyzed using Spark.
Overall, this code snippet is useful for efficiently reading and processing a large number of binary image files in parallel using Spark's distributed computing capabilities.

In [7]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+------------------------------------------------------------------------------------------------------------------------+----------+
|path                                                                                                                    |label     |
+------------------------------------------------------------------------------------------------------------------------+----------+
|file:/home/tomatoketchoup/deployer_model_cloud/notebook/data/fruits-360_dataset/fruits-360/Test/Watermelon/r_106_100.jpg|Watermelon|
|file:/home/tomatoketchoup/deployer_model_cloud/notebook/data/fruits-360_dataset/fruits-360/Test/Watermelon/r_109_100.jpg|Watermelon|
|file:/home/tomatoketchoup/deployer_model_cloud/notebook/data/fruits-360_dataset/fruits-360/Test/Watermelon/r_108_100.jp

## Préparation du modèle

In [27]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

In [28]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

In [29]:
new_model.summary()

Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_2 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_2[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                           

 block_3_expand_relu (ReLU)     (None, 56, 56, 144)  0           ['block_3_expand_BN[0][0]']      
                                                                                                  
 block_3_pad (ZeroPadding2D)    (None, 57, 57, 144)  0           ['block_3_expand_relu[0][0]']    
                                                                                                  
 block_3_depthwise (DepthwiseCo  (None, 28, 28, 144)  1296       ['block_3_pad[0][0]']            
 nv2D)                                                                                            
                                                                                                  
 block_3_depthwise_BN (BatchNor  (None, 28, 28, 144)  576        ['block_3_depthwise[0][0]']      
 malization)                                                                                      
                                                                                                  
 block_3_d

 lization)                                                                                        
                                                                                                  
 block_7_expand (Conv2D)        (None, 14, 14, 384)  24576       ['block_6_project_BN[0][0]']     
                                                                                                  
 block_7_expand_BN (BatchNormal  (None, 14, 14, 384)  1536       ['block_7_expand[0][0]']         
 ization)                                                                                         
                                                                                                  
 block_7_expand_relu (ReLU)     (None, 14, 14, 384)  0           ['block_7_expand_BN[0][0]']      
                                                                                                  
 block_7_depthwise (DepthwiseCo  (None, 14, 14, 384)  3456       ['block_7_expand_relu[0][0]']    
 nv2D)    

                                                                                                  
 block_10_depthwise_relu (ReLU)  (None, 14, 14, 384)  0          ['block_10_depthwise_BN[0][0]']  
                                                                                                  
 block_10_project (Conv2D)      (None, 14, 14, 96)   36864       ['block_10_depthwise_relu[0][0]']
                                                                                                  
 block_10_project_BN (BatchNorm  (None, 14, 14, 96)  384         ['block_10_project[0][0]']       
 alization)                                                                                       
                                                                                                  
 block_11_expand (Conv2D)       (None, 14, 14, 576)  55296       ['block_10_project_BN[0][0]']    
                                                                                                  
 block_11_

 block_14_depthwise (DepthwiseC  (None, 7, 7, 960)   8640        ['block_14_expand_relu[0][0]']   
 onv2D)                                                                                           
                                                                                                  
 block_14_depthwise_BN (BatchNo  (None, 7, 7, 960)   3840        ['block_14_depthwise[0][0]']     
 rmalization)                                                                                     
                                                                                                  
 block_14_depthwise_relu (ReLU)  (None, 7, 7, 960)   0           ['block_14_depthwise_BN[0][0]']  
                                                                                                  
 block_14_project (Conv2D)      (None, 7, 7, 160)    153600      ['block_14_depthwise_relu[0][0]']
                                                                                                  
 block_14_

In [30]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [31]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

In [32]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

## Exécution des actions d'extraction de features

In [33]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [26]:
features_df

DataFrame[path: string, label: string, features: array<float>]

In [22]:
print(PATH_Result)

/home/tomatoketchoup/deployer_model_cloud/notebook/data/fruits-360_dataset/fruits-360/Results


## 3.8 Chargement des données enregistrées et validation du résultat

<u>On charge les données fraichement enregistrées dans un **DataFrame Pandas**</u> :

In [34]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

In [35]:
df.head()

In [36]:
df.loc[0,'features'].shape

KeyError: 'features'