In [80]:
import os
import io
import time
from socket import gethostname

import numpy as np
import pandas as pd
from PIL import Image

import tensorflow as tf
from tensorflow.keras import Model
from tensorflow.keras.models import load_model
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array

from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.types import Row, StringType

from pyspark.ml import Pipeline, Transformer
from pyspark.ml.feature import PCAModel, PCA, MinMaxScaler, StringIndexer, VectorIndexer, IndexToString
from pyspark.ml.linalg import Vectors
from pyspark.ml.functions import vector_to_array, array_to_vector
from pyspark.sql.functions import col, udf, pandas_udf, PandasUDFType, element_at, split

from pyspark import keyword_only
from pyspark.ml.param.shared import HasInputCol, HasOutputCol, Param, Params, TypeConverters
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable
from pyspark.sql.types import ArrayType, FloatType, StringType, Row
from pyspark.ml.linalg import Vectors, VectorUDT

from pyspark.sql.types import ArrayType, DoubleType




In [81]:
# Déterminer si le code s'exécute sur AWS
is_aws = gethostname() != 'Innas-MacBook-Pro.local'

# Configuration des chemins de données
if is_aws:
    # AWS
    PATH = 's3://shved-bucket'
else:
    # Des pistes pour l'environnement local
    PATH = '/Users/innakonar/Desktop/Projet8/'
    os.environ["JAVA_HOME"] = "/usr/local/opt/openjdk@11/libexec/openjdk.jdk/Contents/Home"
    os.environ["SPARK_HOME"] = "/usr/local/Cellar/apache-spark/3.5.0/libexec"

# Voies courantes
PATH_Data = PATH + 'Test/'
PATH_RESULTS = PATH + 'Results/'
PATH_pipe_model = PATH + '/pipeline_model'
PATH_Result = PATH + '/Results_PCA'

# Création des répertoires si nécessaire
os.makedirs(PATH_RESULTS, exist_ok=True)
os.makedirs(PATH_pipe_model, exist_ok=True)
os.makedirs(PATH_Result, exist_ok=True)

# SparkSession
spark = SparkSession.builder.appName("FruitScout")

# Options supplémentaires pour le démarrage local
if not is_aws:
    spark = spark.master("local[4]").config("spark.sql.parquet.writeLegacyFormat", 'true').getOrCreate()
else:
    spark = spark.getOrCreate()


In [82]:
print('PATH:                  ' + PATH + 
      '\nPATH_Data:           ' + PATH_Data + 
      '\nPATH_RESULTS:        ' + PATH_RESULTS + 
      '\nPATH_pipeline_model: ' + PATH_pipe_model +
      '\nPATH_Result:         ' + PATH_Result)


PATH:                  /Users/innakonar/Desktop/Projet8/
PATH_Data:           /Users/innakonar/Desktop/Projet8/Test/
PATH_RESULTS:        /Users/innakonar/Desktop/Projet8/Results/
PATH_pipeline_model: /Users/innakonar/Desktop/Projet8//pipeline_model
PATH_Result:         /Users/innakonar/Desktop/Projet8//Results_PCA


In [83]:
t0 = time.time()

In [84]:
#Récupération du label à partir du chemin du fichier
class PathToLabelTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(PathToLabelTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        return dataset.withColumn("label", element_at(split(dataset["path"], "/"), -2))

  

In [85]:
#Transformation de l’image en features 
class ImageFeatureTransformer(Transformer, HasInputCol, HasOutputCol, DefaultParamsReadable, DefaultParamsWritable):
    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(ImageFeatureTransformer, self).__init__()
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        sc = SparkSession.builder.getOrCreate().sparkContext

        def load_model():
            base_model = MobileNetV2(weights='imagenet', include_top=True, input_shape=(224, 224, 3))
            for layer in base_model.layers:
                layer.trainable = False
            model = Model(inputs=base_model.input, outputs=base_model.layers[-2].output)
            return model

        model = load_model()

        broadcast_weights = sc.broadcast(model.get_weights())

        def preprocess(content):
            img = Image.open(io.BytesIO(content)).resize([224, 224])
            arr = img_to_array(img)
            return preprocess_input(arr)

        def featurize(content):
            model.set_weights(broadcast_weights.value)
            arr = preprocess(content)
            preds = model.predict(np.expand_dims(arr, axis=0))
            return preds.flatten().tolist()

        featurize_udf = udf(featurize, ArrayType(FloatType()))
        return dataset.withColumn(self.getOutputCol(), featurize_udf(col(self.getInputCol())))


In [86]:
#Transformation du tableau de features en vecteur de features
class ArrayToVectorTransformer(Transformer, HasInputCol, HasOutputCol,DefaultParamsReadable, DefaultParamsWritable):

    @keyword_only
    def __init__(self, inputCol=None, outputCol=None):
        super(ArrayToVectorTransformer, self).__init__()
        self._setDefault(inputCol=None, outputCol=None)
        kwargs = self._input_kwargs
        self.setParams(**kwargs)

    @keyword_only
    def setParams(self, inputCol=None, outputCol=None):
        kwargs = self._input_kwargs
        return self._set(**kwargs)

    def _transform(self, dataset):
        toArray = udf(lambda x: Vectors.dense(x), VectorUDT())
        return dataset.withColumn(self.getOutputCol(), toArray(self.getInputCol()))


In [87]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)



In [88]:
images.show(10)

+--------------------+--------------------+------+--------------------+
|                path|    modificationTime|length|             content|
+--------------------+--------------------+------+--------------------+
|file:/Users/innak...|2023-11-12 14:23:...|  7353|[FF D8 FF E0 00 1...|
|file:/Users/innak...|2023-11-12 14:23:...|  7350|[FF D8 FF E0 00 1...|
|file:/Users/innak...|2023-11-12 14:23:...|  7349|[FF D8 FF E0 00 1...|
|file:/Users/innak...|2023-11-12 14:23:...|  7348|[FF D8 FF E0 00 1...|
|file:/Users/innak...|2023-11-12 14:23:...|  7328|[FF D8 FF E0 00 1...|
|file:/Users/innak...|2023-11-12 14:23:...|  7301|[FF D8 FF E0 00 1...|
|file:/Users/innak...|2023-11-12 14:23:...|  7278|[FF D8 FF E0 00 1...|
|file:/Users/innak...|2023-11-12 14:23:...|  7275|[FF D8 FF E0 00 1...|
|file:/Users/innak...|2023-11-12 14:23:...|  7266|[FF D8 FF E0 00 1...|
|file:/Users/innak...|2023-11-12 14:23:...|  7266|[FF D8 FF E0 00 1...|
+--------------------+--------------------+------+--------------

In [89]:
# Initialisation des transformateurs

path_to_label = PathToLabelTransformer(inputCol="path", outputCol="label")
label_indexer = StringIndexer(inputCol="label", outputCol="label_index") 
image_feature = ImageFeatureTransformer(inputCol="content", outputCol="features") 
array_to_vector = ArrayToVectorTransformer(inputCol="features", outputCol="features_vector")

# MinMaxScaler
minMaxScaler = MinMaxScaler(inputCol="features_vector", outputCol="scaled_features")

# PCA
pca = PCA(k=1000, inputCol="scaled_features", outputCol="pca_features")

# Pipeline
pipeline = Pipeline(stages=[path_to_label, image_feature, array_to_vector, label_indexer, minMaxScaler, pca])



In [41]:
# sampled_data = images.limit(300)

In [None]:
# pipeline_model = pipeline.fit(sampled_data)
# transformed_df = pipeline_model.transform(sampled_data)

In [90]:
pipeline_model = pipeline.fit(images)
transformed_df = pipeline_model.transform(images)

2023-11-17 21:03:16.809318: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-11-17 21:03:16.809318: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-11-17 21:03:16.809321: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-11-17 21:03:16.809322: I tensorflow/core/platform/cpu_featu























































































































































































































































































2023-11-17 21:33:35.705078: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-11-17 21:33:35.705081: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.






























































































































































































































































































2023-11-17 22:01:30.904624: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


























































































































































































































































































                                                                                

In [91]:
# Sauvegarde des résultats
transformed_df.write.mode("overwrite").parquet(PATH_RESULTS)
pipeline_model.save(PATH_pipe_model)
pca_result = transformed_df.select("pca_features")
pca_result.write.mode("overwrite").parquet(PATH_Result)



23/11/17 22:31:09 WARN DAGScheduler: Broadcasting large task binary with size 10.1 MiB
2023-11-17 22:31:11.246279: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-11-17 22:31:11.246301: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-11-17 22:31:11.246307: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appro























































































































































































































































































































































































































































































































































































23/11/17 23:23:48 WARN TaskSetManager: Stage 132 contains a task of very large size (10221 KiB). The maximum recommended task size is 1000 KiB.
23/11/17 23:23:48 WARN DAGScheduler: Broadcasting large task binary with size 10.0 MiB




























































































































































































































































































                                                                                

# Chargement des données enregistrées et validation du résultat

In [92]:
 df = pd.read_parquet(PATH_RESULTS, engine='pyarrow')

In [93]:
 df.tail(40)

Unnamed: 0,path,modificationTime,length,content,label,features,features_vector,label_index,scaled_features,pca_features
22648,file:/Users/innakonar/Desktop/Projet8/Test/Ban...,2023-11-12 13:23:40.196,2400,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,Banana,"[0.8487628, 0.040467136, 0.0, 0.01474183, 0.68...","{'type': 1, 'size': None, 'indices': None, 'va...",26.0,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
22649,file:/Users/innakonar/Desktop/Projet8/Test/Ban...,2023-11-12 13:23:40.199,2400,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,Banana,"[0.077229574, 0.34015042, 0.1347241, 0.0032964...","{'type': 1, 'size': None, 'indices': None, 'va...",26.0,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
22650,file:/Users/innakonar/Desktop/Projet8/Test/Ban...,2023-11-12 13:23:40.201,2399,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,Banana,"[0.056909174, 0.1369728, 0.0, 0.009847593, 0.1...","{'type': 1, 'size': None, 'indices': None, 'va...",26.0,"{'type': 0, 'size': 1280.0, 'indices': [0, 1, ...","{'type': 1, 'size': None, 'indices': None, 'va..."
22651,file:/Users/innakonar/Desktop/Projet8/Test/Ban...,2023-11-12 13:23:40.202,2399,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,Banana,"[0.01736346, 0.18086791, 0.0, 0.008772529, 0.3...","{'type': 1, 'size': None, 'indices': None, 'va...",26.0,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
22652,file:/Users/innakonar/Desktop/Projet8/Test/Ban...,2023-11-12 13:23:40.198,2398,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,Banana,"[0.36592633, 0.03763305, 0.0, 0.0032287813, 0....","{'type': 1, 'size': None, 'indices': None, 'va...",26.0,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
22653,file:/Users/innakonar/Desktop/Projet8/Test/Ban...,2023-11-12 13:23:40.196,2398,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,Banana,"[0.49564168, 0.024583155, 0.0, 0.0026560414, 0...","{'type': 1, 'size': None, 'indices': None, 'va...",26.0,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
22654,file:/Users/innakonar/Desktop/Projet8/Test/Ban...,2023-11-12 13:23:40.145,2397,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,Banana Lady Finger,"[0.053133126, 0.0, 0.3847835, 0.0, 1.3311466, ...","{'type': 1, 'size': None, 'indices': None, 'va...",110.0,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
22655,file:/Users/innakonar/Desktop/Projet8/Test/Ban...,2023-11-12 13:23:40.206,2397,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,Banana,"[0.09214552, 0.009674783, 0.0, 0.0, 0.7438581,...","{'type': 1, 'size': None, 'indices': None, 'va...",26.0,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
22656,file:/Users/innakonar/Desktop/Projet8/Test/Ban...,2023-11-12 13:23:40.203,2396,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,Banana,"[0.062905274, 0.255593, 0.0, 0.0, 0.055446304,...","{'type': 1, 'size': None, 'indices': None, 'va...",26.0,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."
22657,file:/Users/innakonar/Desktop/Projet8/Test/Ban...,2023-11-12 13:23:40.199,2395,b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...,Banana,"[0.08627917, 0.36359492, 0.0, 0.0, 0.3427798, ...","{'type': 1, 'size': None, 'indices': None, 'va...",26.0,"{'type': 1, 'size': None, 'indices': None, 'va...","{'type': 1, 'size': None, 'indices': None, 'va..."


<!-- df1.pca_features -->

In [94]:
# df_pca = pd.read_parquet(PATH_Result, engine='pyarrow')

In [None]:
# df_pca.head

In [96]:
def sparse_vector_to_list(row):
    if isinstance(row, dict) and 'values' in row:
        return row['values']
    return row

df['pca_features_list'] = df['pca_features'].apply(sparse_vector_to_list)

df.to_csv("Result_PCA.csv", index=False)


In [97]:
 def sparse_vector_to_list(row):
    if isinstance(row, dict) and 'values' in row:
         return row['values']
    return row
df['pca_features_list'] = df['pca_features'].apply(sparse_vector_to_list)

df.to_csv("Result_PCA.csv", index=False)


In [98]:
csv_file2 = "Result_PCA.csv"
df4 = pd.read_csv(csv_file2)
print(df.head())

                                                path        modificationTime   
0  file:/Users/innakonar/Desktop/Projet8/Test/Wat... 2023-11-12 13:23:44.106  \
1  file:/Users/innakonar/Desktop/Projet8/Test/Wat... 2023-11-12 13:23:44.107   
2  file:/Users/innakonar/Desktop/Projet8/Test/Wat... 2023-11-12 13:23:44.107   
3  file:/Users/innakonar/Desktop/Projet8/Test/Wat... 2023-11-12 13:23:44.107   
4  file:/Users/innakonar/Desktop/Projet8/Test/Wat... 2023-11-12 13:23:44.125   

   length                                            content       label   
0    7353  b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...  Watermelon  \
1    7350  b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...  Watermelon   
2    7349  b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...  Watermelon   
3    7348  b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...  Watermelon   
4    7328  b'\xff\xd8\xff\xe0\x00\x10JFIF\x00\x01\x01\x00...  Watermelon   

                                            features   
0  [0.

In [None]:
PATH = os.getcwd()
pipe_mdel = Pipeline.load(PATH + '/pipeline_model')
                                                                                
exp_var_cumul = np.append(0, np.cumsum([0] + pipe_mdel.getStages()[-1].explainedVariance))
exp_var_cumul

In [None]:
df = pd.DataFrame({'pca': exp_var_cumul})
from plotly import express as px
fig = px.line(df, y="pca", title='Variance cumulée expliquée')
fig.update_layout({
    "width": 800,
    "height": 800,
    "xaxis_title": "Nb composantes PCA",
    "yaxis_title": "Ratio variance expliquée"
})

fig.show()

In [168]:
 elapsed_time = time.time() - t0

In [169]:
print(f"durée d'execution: {time.time() - t0}")
w = True
while w:
    a = input("Appuyez sur Entrée pour arrêter: ")
    if a == '':
        
        w=False

durée d'execution: 10148.41208600998
Appuyez sur Entrée pour arrêter: 


In [170]:
# spark.stop()