In [None]:
# !pip install Pandas pillow tensorflow keras pyspark pyarrow

## Import des librairies

In [1]:
import time
import os
import sys

import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
import io
from PIL import Image

from pyspark.sql.types import StructType, StructField, FloatType, StringType, ArrayType
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
# from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql import SparkSession


## Data Path

In [2]:
PATH = os.getcwd()
PATH_Data = os.path.join(PATH, 'data', 'Test1')
PATH_Result = os.path.join(PATH, 'data', 'Results')
print('PATH:        ' + \
      PATH + '\nPATH_Data:   ' + \
      PATH_Data + '\nPATH_Result: ' + PATH_Result)

PATH:        C:\Users\chauv\Projets\OC\P8-BigData\P8_Notebook
PATH_Data:   C:\Users\chauv\Projets\OC\P8-BigData\P8_Notebook\data\Test1
PATH_Result: C:\Users\chauv\Projets\OC\P8-BigData\P8_Notebook\data\Results


## Variables d'environnement

In [3]:
JAVA_HOME = "C:\\Program Files\\Java\\jdk-17\\"
os.environ["JAVA_HOME"] = JAVA_HOME

PYSPARK_PYTHON = sys.executable
os.environ["PYSPARK_PYTHON"] = PYSPARK_PYTHON

PYSPARK_DRIVER_PYTHON = sys.executable
os.environ["PYSPARK_DRIVER_PYTHON"] = PYSPARK_DRIVER_PYTHON

## Spark Session

In [4]:
spark = (SparkSession
         .builder
         .appName('P8_Fruits!')
         .master('local')
         .config("spark.sql.parquet.writeLegacyFormat", 'true')
         .getOrCreate()
         )

In [5]:
sc = spark.sparkContext
sc

## Traitement des données

### Chargement du modèle

In [6]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
new_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

### Broadcast des poids

In [8]:
brodcast_weights = sc.broadcast(new_model.get_weights())

## Préparation du Modèle

### Fonction Model_fn()

In [10]:
def model_fn() -> Model:
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    global brodcast_weights

    _model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in _model.layers:
        layer.trainable = False
    _new_model = Model(inputs=_model.input,
                      outputs=_model.layers[-2].output)
    _new_model.set_weights(brodcast_weights.value)
    return _new_model

### Vérification des images

In [30]:
os.listdir(PATH_Data)
for (dirpath, dirnames, filenames) in os.walk(PATH_Data):
    # print(dirpath, dirnames, filenames)
    if len(filenames) > 0:
        for filename in filenames:
            try:
                Image.open(os.path.join(dirpath, filename)).verify()
            except Exception:
                print("Not a Valid Image : ", os.path.join(dirpath, filename))

print("All good!")

All good!


### Fonction preprocess

In [13]:
def preprocess_image(row) -> (list, str):
    """
    Preprocesses the image
    :param row: Row Spark DataFrame
    :return: image array, image path
    """
    image_binary = row.content
    # Récupérer le chemin de l'image
    image_path = row.path
    image = Image.open(io.BytesIO(image_binary))
    image = image.resize((224, 224))
    # Convertir l'image en tableau numpy
    image_array = np.array(image)
    # Appliquer le prétraitement
    image_array = preprocess_input(image_array)
     # Récupérer le dossier et le nom de l'image
    # Dossier contenant l'image
    image_dir = os.path.dirname(image_path).split("/")[-1]
    # Nom de l'image
    image_name = os.path.basename(image_path)

    return image_array, f"{image_dir}/{image_name}"  # Retourner l'image prétraitée et le chemin


### Paramètres

In [15]:
# Taille max des images de partition
NB_MEGA_PER_PARTITION = 128

# Taille de la dimension réduite
PCA_NEW_DIMENSION = 128

## Traitement des images

### 1. Chargement des images

In [None]:
# Étape 1 : Chargement des images
images = spark.read \
    .format("binaryFile") \
    .option("pathGlobFilter", "*.jpg") \
    .option("recursiveFileLookup", "true") \
    .load(PATH_Data)


In [29]:
# Estimer la taille totale des données
# total_size = images.count() * images.rdd.map(lambda row: len(row.content)).mean()
#
# # Calculer le nombre de partitions souhaité
# NUM_PARTITIONS = max(1, int(total_size / (NB_MEGA_PER_PARTITION * 1024 * 1024)))

NUM_PARTITIONS = 2
print('NUM_PARTITIONS: ', NUM_PARTITIONS)

NUM_PARTITIONS:  2


### 2. Preprocess

In [17]:
# Étape 2a : Redimensionnement et prétraitement des images
preprocessed_images_with_paths = images.rdd.map(preprocess_image).collect()

Temps de collecte = 1m 25s

In [18]:
# Séparation des images prétraitées...
preprocessed_images = np.array([item[0] for item in preprocessed_images_with_paths])
# ... et des chemins
image_paths = [item[1] for item in preprocessed_images_with_paths]

### 3. Predictions

In [19]:
# Étape 3 : Prédiction du modèle
# Récupération du modèle
model = model_fn()
predictions = model.predict(np.array(preprocessed_images))



### 4. Réduction des dimensions

In [20]:
# Étape 4 : Réduction des dimensions avec PCA
pca = PCA(n_components=PCA_NEW_DIMENSION)
reduced_features = pca.fit_transform(predictions)

### 5. Création du DataFrame Spark

In [21]:
# Créer un DataFrame avec le chemin de l'image et les caractéristiques sous forme de tableau
features_df = spark.createDataFrame(
    [(image_paths[i], reduced_features[i].tolist()) for i in range(len(image_paths))],
    schema=StructType([
        StructField("image_path", StringType()),
        StructField("features", ArrayType(FloatType()))  # Utiliser ArrayType pour les features
    ])
)
# Repartitionner le DataFrame
df_spark = features_df.repartition(NUM_PARTITIONS)

### 6. Visualisation

In [22]:
# Étape 6 : Visualisation du DataFrame
df_spark.show(3, truncate=False)

+---------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

### 7. Sauvegarde

In [23]:
# Étape 7 : Sauvegarde des parquets
t0 = time.perf_counter()
df_spark.write.mode("overwrite").parquet(PATH_Result)
t1 = time.perf_counter()
print("total time: ", t1 - t0)

total time:  1.3113184999674559


In [24]:
df_pandas = pd.read_parquet(PATH_Result, engine='pyarrow')

In [25]:
df_pandas.describe()

Unnamed: 0,image_path,features
count,814,814
unique,814,814
top,Apple Braeburn/r_73_100.jpg,"[10.874676, 0.21191794, 2.7552166, 4.7443943, ..."
freq,1,1


In [26]:
df_pandas.loc[0,'features'].shape

(128,)

In [27]:
df_pandas.head()

Unnamed: 0,image_path,features
0,Apple Braeburn/r_73_100.jpg,"[10.874676, 0.21191794, 2.7552166, 4.7443943, ..."
1,apple_braeburn_1/r0_299.jpg,"[15.350532, -1.2815833, -1.4976716, -4.29906, ..."
2,Apple Braeburn/r_59_100.jpg,"[11.281214, -0.41652662, 3.4917502, 5.412005, ..."
3,Carambula/264_100.jpg,"[-8.8863535, -6.7135005, -1.7254672, 1.8977714..."
4,Carambula/r_302_100.jpg,"[-8.680854, 9.945538, 3.6776936, 0.2960881, 1...."


In [28]:
# FIN