# Classification d'images de fruits

## 1. Import des packages et librairies

In [28]:
import os
import io
import boto3
import random
import numpy as np
import pandas as pd
from PIL import Image
from io import BytesIO
import tensorflow as tf
import matplotlib.pyplot as plt
from tensorflow.keras import Model
from sklearn.decomposition import PCA
from tensorflow.keras.preprocessing.image import img_to_array
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 2. Définition des PATH pour charger les images et enregistrer les résultats

In [29]:
# Définir les chemins
PATH = 's3://p8fruits-data'
PATH_Data = PATH + '/Test'
PATH_Result_csv = PATH + '/Results_csv'
PATH_Result_parquet = PATH + '/Results_parquet'
PATH_Result_PCA_csv = PATH + '/Results_PCA_csv'
PATH_Result_PCA_parquet = PATH + '/Results_PCA_parquet'

# Impression des chemins avec une concaténation correcte
print('PATH:                   ' + 
      PATH + '\nPATH_Data:              ' + 
      PATH_Data + '\nPATH_Result_csv:        ' + 
      PATH_Result_csv + '\nPATH_Result_parquet:   ' + 
      PATH_Result_parquet + '\nPATH_Result_PCA_csv: ' + 
      PATH_Result_PCA_csv + '\nPATH_Result_PCA_parquet: ' + 
      PATH_Result_PCA_parquet)

# Collecter tous les chemins d'images dans Test et ses sous-répertoires
image_files = []
for root, dirs, files in os.walk(PATH_Data):
    for file in files:
        if file.lower().endswith('.jpg'):
            image_files.append(os.path.join(root, file))


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

PATH:                   s3://p8fruits-data
PATH_Data:              s3://p8fruits-data/Test
PATH_Result_csv:        s3://p8fruits-data/Results_csv
PATH_Result_parquet:   s3://p8fruits-data/Results_parquet
PATH_Result_PCA_csv: s3://p8fruits-data/Results_PCA_csv
PATH_Result_PCA_parquet: s3://p8fruits-data/Results_PCA_parquet

## 3. Chargement et bref apperçu des données

### 3.1 Chargement des données

In [30]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 3.2 Brève Data exploration

In [31]:
images.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-------------------+------+--------------------+
|                path|   modificationTime|length|             content|
+--------------------+-------------------+------+--------------------+
|s3://p8fruits-dat...|2024-08-15 09:28:17|  7226|[FF D8 FF E0 00 1...|
|s3://p8fruits-dat...|2024-08-15 09:26:52|  7215|[FF D8 FF E0 00 1...|
|s3://p8fruits-dat...|2024-08-15 09:24:59|  7019|[FF D8 FF E0 00 1...|
|s3://p8fruits-dat...|2024-08-15 09:26:48|  7009|[FF D8 FF E0 00 1...|
|s3://p8fruits-dat...|2024-08-15 09:24:55|  6969|[FF D8 FF E0 00 1...|
+--------------------+-------------------+------+--------------------+
only showing top 5 rows

In [32]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
images.printSchema()
images.select('path','label').show(5,False)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

+-----------------------------------------+-----+
|path                                     |label|
+-----------------------------------------+-----+
|s3://p8fruits-data/Test/r_83_100_483.jpg |Test |
|s3://p8fruits-data/Test/r_175_100_755.jpg|Test |
|s3://p8fruits-data/Test/309_100_623.jpg  |Test |
|s3://p8fruits-data/Test/r_168_100_691.jpg|Test |
|s3://p8fruits-data/Test/288_100_353.jpg  |Test |
+-----------------------------------------+-----+
only showing top 5 rows

In [33]:
# Initialiser un client S3
s3 = boto3.client('s3')

# Définir les informations du bucket et du dossier
bucket_name = 'p8fruits-data'
prefix = 'Test/'

# Collecter tous les chemins d'images dans le bucket S3
image_files = []
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=prefix)

# Ajouter les fichiers .jpg à la liste
for content in response.get('Contents', []):
    if content['Key'].lower().endswith('.jpg'):
        image_files.append(content['Key'])

# Sélectionner aléatoirement 20 images pour affichage (ou moins si le nombre total est inférieur)
num_images = min(20, len(image_files))
sample_images = random.sample(image_files, num_images)

# Initialiser les dimensions
image_width, image_height = None, None

# Afficher les images et leurs dimensions
plt.figure(figsize=(10, 8))  # Ajuster la taille de la figure

for idx, image_key in enumerate(sample_images):
    try:
        # Télécharger l'image depuis S3 en mémoire
        img_data = s3.get_object(Bucket=bucket_name, Key=image_key)['Body'].read()

        # Charger l'image avec Pillow depuis le flux en mémoire
        img = Image.open(BytesIO(img_data))
        img = img.convert("RGB")  # Convertir en RGB

        # Obtenir les dimensions
        if image_width is None and image_height is None:
            image_width, image_height = img.size
            print(f"Dimensions de toutes les images : {image_width}x{image_height}")

        # Afficher l'image avec matplotlib
        plt.subplot(4, 5, idx + 1)
        plt.imshow(img)
        plt.axis('off')  # Masquer les axes

    except Exception as e:
        print(f"Erreur avec l'image {image_key}: {e}")

# Afficher les images sélectionnées
plt.tight_layout()
plt.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Dimensions de toutes les images : 100x100

## 4. Préparation du modèle

### 4.1 Import du modèle MobilNetV2

In [34]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.2 Définition d'un nouveau modèle via le transfert learning

In [35]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

### 4.3 Diffusion des poids du modèle sur les clusters

In [36]:
brodcast_weights = sc.broadcast(new_model.get_weights())

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [37]:
new_model.summary()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Model: "functional_1"
┏━━━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┳━━━━━━━━━━━━┳━━━━━━━━━━━━━━━━━━━┓
┃ Layer (type)        ┃ Output Shape      ┃    Param # ┃ Connected to      ┃
┡━━━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━╇━━━━━━━━━━━━╇━━━━━━━━━━━━━━━━━━━┩
│ input_layer_1       │ (None, 224, 224,  │          0 │ -                 │
│ (InputLayer)        │ 3)                │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ Conv1 (Conv2D)      │ (None, 112, 112,  │        864 │ input_layer_1[0]… │
│                     │ 32)               │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ bn_Conv1            │ (None, 112, 112,  │        128 │ Conv1[0][0]       │
│ (BatchNormalizatio… │ 32)               │            │                   │
├─────────────────────┼───────────────────┼────────────┼───────────────────┤
│ Conv1_relu (ReLU)   │ (None, 112, 112,  │          0

### 4.4 Fonction de définition du nouveau modèle

In [38]:
def model_fn():
    """
    Renvoie un modèle MobileNetV2 avec la couche supérieure supprimée 
    et des poids pré-entraînés diffusés
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## 5. Chargement des images et featurisation avec pandas UDF

# ## 5.1 Définition d'une fonction

In [39]:
def preprocess(content):
    """
    Prétraite les images brute pour la prédiction
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurise une série d'images brutes en utilisant le modèle d'entrée
    Retourn une serie de features d'image
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    
    # aplatissons les tenseurs de fonctionnalités en vecteurs pour un stockage plus facile dans Spark DataFrames
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    Cette méthode est un UDF Scalar Iterator pandas enveloppant notre fonction de caractérisation.
    Le décorateur précise que cela renvoie une colonne Spark DataFrame de type Array.

    :param content_series_iter : cet argument est un itérateur sur des lots de données, où chaque lot est une série pandas de données d'image..
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



# ## 5.2 Extraction des features

In [40]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [41]:
features_df.coalesce(1)\
         .withColumn('features', col('features').cast('string'))\
        .write.mode("overwrite").csv(PATH_Result_csv)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [42]:
features_df.write.mode("overwrite").parquet(PATH_Result_parquet)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# # # 5.3 Chargement et description des features 

In [43]:
df = pd.read_parquet(PATH_Result_parquet, engine='pyarrow')

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
df.head(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                        path  ...                                           features
0  s3://p8fruits-data/Test/r_175_100_755.jpg  ...  [0.018488875, 0.09149935, 0.0, 0.0, 3.2394545,...
1  s3://p8fruits-data/Test/r_168_100_691.jpg  ...  [0.0, 0.08826472, 0.0, 0.0, 2.5968323, 0.61070...
2  s3://p8fruits-data/Test/r_187_100_633.jpg  ...  [0.08827592, 0.022499474, 0.0, 0.0, 0.01769151...
3  s3://p8fruits-data/Test/r_103_100_138.jpg  ...  [0.30975217, 0.3588128, 0.0, 0.0, 0.7571645, 1...
4   s3://p8fruits-data/Test/r_28_100_354.jpg  ...  [1.2042733, 2.9667635, 0.0, 0.0, 0.4696603, 0....
5  s3://p8fruits-data/Test/r_189_100_797.jpg  ...  [0.19742265, 0.0, 0.0, 0.0, 0.0, 0.0, 0.251751...
6   s3://p8fruits-data/Test/r_52_100_128.jpg  ...  [0.029218363, 0.0, 0.0, 0.0, 0.0, 1.4531016, 0...
7   s3://p8fruits-data/Test/r_89_100_733.jpg  ...  [0.0030743442, 0.095615715, 0.0, 0.0, 0.068235...
8   s3://p8fruits-data/Test/r_47_100_197.jpg  ...  [2.2517405, 0.31276655, 0.0, 0.0, 0.1864

In [45]:
df.loc[0,'features'].shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [46]:
print(df.shape)
print(df.columns)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1000, 3)
Index(['path', 'label', 'features'], dtype='object')

In [47]:
# Calculer le nombre de composants (longueur des listes) pour les 5 premières lignes
df['n_components'] = df['features'].apply(len)

# Afficher le nombre de composants pour les 5 premières lignes
print(df[['features', 'n_components']].head(5))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                            features  n_components
0  [0.018488875, 0.09149935, 0.0, 0.0, 3.2394545,...          1280
1  [0.0, 0.08826472, 0.0, 0.0, 2.5968323, 0.61070...          1280
2  [0.08827592, 0.022499474, 0.0, 0.0, 0.01769151...          1280
3  [0.30975217, 0.3588128, 0.0, 0.0, 0.7571645, 1...          1280
4  [1.2042733, 2.9667635, 0.0, 0.0, 0.4696603, 0....          1280

# # 6. Application d'un reduction de dimensions aux features

# ## 6.1 Définition de fonctions de PCA et UDF

In [48]:
# Exemple de données avec plusieurs caractéristiques par échantillon
features = np.random.rand(100, 50)  # 100 échantillons, 50 caractéristiques

# Nombre initial de caractéristiques
n_features_initial = features.shape[1]

print(f"Nombre initial de caractéristiques : {n_features_initial}")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Nombre initial de caract?ristiques : 50

In [49]:
from sklearn.decomposition import PCA
import numpy as np
import matplotlib.pyplot as plt

def optimal_n_components(features, variance_threshold=0.80):
    # Assurer que les données sont sous forme de tableau 2D
    # Si 'features' est une liste de matrices, les empiler et aplatir en une seule matrice 2D
    features_array = np.vstack([np.array(f).reshape(len(f), -1) for f in features])
    
    # Créer un modèle PCA
    pca = PCA()
    pca.fit(features_array)
    
    # Calculer la variance expliquée cumulée
    cumulative_variance = np.cumsum(pca.explained_variance_ratio_)
    
    # Trouver le nombre de composants nécessaires pour expliquer au moins 'variance_threshold' de la variance
    n_components_optimal = np.argmax(cumulative_variance >= variance_threshold) + 1
    
    return n_components_optimal, cumulative_variance

# Exemple de données (remplacez ceci par vos données réelles)
# Supposons que chaque 'feature' est une matrice 2D où les lignes sont les échantillons et les colonnes les caractéristiques
features = [np.random.rand(100, 10) for _ in range(50)]  # Remplacez ceci par vos données réelles

# Calculer le nombre optimal de composantes
optimal_components, cumulative_variance = optimal_n_components(features, 0.80)

print(f"Nombre optimal de composantes pour expliquer 80% de la variance : {optimal_components}")

# Afficher la variance expliquée cumulée
plt.plot(cumulative_variance)
plt.axhline(y=0.80, color='r', linestyle='--')
plt.xlabel('Nombre de composantes')
plt.ylabel('Variance expliquée cumulée')
plt.title('Variance expliquée cumulée en fonction du nombre de composantes')
plt.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Nombre optimal de composantes pour expliquer 80% de la variance : 8

In [50]:
# Ajouter une fonction pour appliquer PCA
def apply_pca(features, n_components=8):
    pca = PCA(n_components=n_components)
    reduced_features = pca.fit_transform(np.stack(features))
    return pd.Series(reduced_features.tolist())

# Appliquer PCA après la featurization
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_and_reduce_udf(content_series_iter):
    model = model_fn()
    for content_series in content_series_iter:
        features = featurize_series(model, content_series)
        reduced_features = apply_pca(features)
        yield reduced_features                                                                                                               
                                                                                                                                                  
                                                                                                                                                  # Utiliser la nouvelle UDF dans le pipeline Spark
features_PCA_df = images.repartition(20).select(
    col("path"),
    col("label"),
    featurize_and_reduce_udf("content").alias("features")
)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



# ## 6.2 Application de la focntion et apperçu des features avec PCA

In [51]:
# Utiliser la nouvelle UDF dans le pipeline Spark
features_PCA_df = images.repartition(20).select(
    col("path"),
    col("label"),
    featurize_and_reduce_udf("content").alias("features")
)

# Afficher le résultat et sauvegarder
features_PCA_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----+--------------------+
|                path|label|            features|
+--------------------+-----+--------------------+
|s3://p8fruits-dat...| Test|[-11.566925, 0.66...|
|s3://p8fruits-dat...| Test|[14.230675, -2.79...|
|s3://p8fruits-dat...| Test|[-2.2880695, 5.72...|
|s3://p8fruits-dat...| Test|[-4.462242, 6.320...|
|s3://p8fruits-dat...| Test|[-6.6240153, 1.04...|
|s3://p8fruits-dat...| Test|[-1.4022199, 3.26...|
|s3://p8fruits-dat...| Test|[3.9984443, 4.132...|
|s3://p8fruits-dat...| Test|[3.6370122, 5.803...|
|s3://p8fruits-dat...| Test|[-3.4081554, -3.5...|
|s3://p8fruits-dat...| Test|[-13.001686, -8.5...|
|s3://p8fruits-dat...| Test|[13.030794, -0.51...|
|s3://p8fruits-dat...| Test|[14.394788, -0.19...|
|s3://p8fruits-dat...| Test|[-0.58005476, 6.9...|
|s3://p8fruits-dat...| Test|[-3.9437342, 4.01...|
|s3://p8fruits-dat...| Test|[-4.8096185, 4.28...|
|s3://p8fruits-dat...| Test|[-6.70047, 3.5055...|
|s3://p8fruits-dat...| Test|[4.442547, 8.5970...|


In [52]:
## Ecrire et lire les résultats avec pandas

features_PCA_df.coalesce(1)\
         .withColumn('features', col('features').cast('string'))\
        .write.mode("overwrite").csv(PATH_Result_PCA_csv)

features_df.write.mode("overwrite").parquet(PATH_Result_PCA_parquet)


df_PCA = pd.read_parquet(PATH_Result_PCA_parquet, engine='pyarrow')
print(df.loc[0, 'features'].shape)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1280,)

In [53]:
df_PCA.shape

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

(1000, 3)

In [54]:
df_PCA.head(10)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

                                        path  ...                                           features
0   s3://p8fruits-data/Test/r_89_100_733.jpg  ...  [0.003073887, 0.09561673, 0.0, 0.0, 0.06823575...
1     s3://p8fruits-data/Test/102_100_77.jpg  ...  [0.21574153, 0.0019428681, 0.0, 0.0, 1.1624289...
2   s3://p8fruits-data/Test/r_74_100_555.jpg  ...  [0.24579911, 0.025969964, 0.0, 0.0, 0.39935267...
3     s3://p8fruits-data/Test/80_100_356.jpg  ...  [0.0, 0.61298555, 0.060554136, 0.23524463, 1.1...
4    s3://p8fruits-data/Test/124_100_717.jpg  ...  [0.15481292, 0.1709838, 0.0, 0.0, 0.4363604, 0...
5   s3://p8fruits-data/Test/r_52_100_128.jpg  ...  [0.029218294, 0.0, 0.0, 0.0, 0.0, 1.453101, 0....
6   s3://p8fruits-data/Test/r_47_100_439.jpg  ...  [0.2294881, 0.0, 0.0, 0.0, 0.020723723, 0.0, 0...
7  s3://p8fruits-data/Test/r_121_100_909.jpg  ...  [0.0, 2.3898084, 0.0, 1.754517, 0.22358634, 0....
8  s3://p8fruits-data/Test/r_314_100_609.jpg  ...  [0.020646328, 0.6035844, 0.090491205, 0.