In [1]:
!pip install Pandas pillow tensorflow pyspark pyarrow

Collecting py4j==0.10.9.7 (from pyspark)
  Downloading py4j-0.10.9.7-py2.py3-none-any.whl.metadata (1.5 kB)
Downloading py4j-0.10.9.7-py2.py3-none-any.whl (200 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m200.5/200.5 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hInstalling collected packages: py4j
Successfully installed py4j-0.10.9.7


In [50]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split, udf
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import ArrayType, FloatType
from pyspark.ml.feature import PCA, StandardScaler

In [3]:
print(os.getcwd())

/home/jovyan/work


In [24]:
PATH = os.getcwd()
PATH_Data = PATH+'/fruits-360_dataset/fruits-360/Test/Apple Braeburn'
PATH_Result = PATH+'/fruits-360_dataset/fruits-360/Results'
PATH_Result_PCA = PATH+'/fruits-360_dataset/fruits-360/Results_PCA'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result+\
     '\nPATH_Result_PCA: '+PATH_Result_PCA)

PATH:        /home/jovyan/work
PATH_Data:   /home/jovyan/work/fruits-360_dataset/fruits-360/Test/Apple Braeburn
PATH_Result: /home/jovyan/work/fruits-360_dataset/fruits-360/Results
PATH_Result_PCA: /home/jovyan/work/fruits-360_dataset/fruits-360/Results_PCA


In [5]:
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)

In [6]:
sc = spark.sparkContext

In [7]:
spark

In [8]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

In [9]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+--------------------------------------------------------------------------------------+--------------+
|path                                                                                  |label         |
+--------------------------------------------------------------------------------------+--------------+
|file:/home/jovyan/work/fruits-360_dataset/fruits-360/Test/Apple Braeburn/r_326_100.jpg|Apple Braeburn|
|file:/home/jovyan/work/fruits-360_dataset/fruits-360/Test/Apple Braeburn/r_4_100.jpg  |Apple Braeburn|
|file:/home/jovyan/work/fruits-360_dataset/fruits-360/Test/Apple Braeburn/r_8_100.jpg  |Apple Braeburn|
|file:/home/jovyan/work/fruits-360_dataset/fruits-360/Test/Apple Braeburn/r_324_100.jpg|Apple Braeburn|
|file:/home/jovyan/work/fruits-360_dataset/fruits-360/Test/App

In [10]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5


In [11]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

In [12]:
new_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 112, 112, 32)         864       ['input_1[0][0]']             
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 112, 112, 32)         128       ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 112, 112, 32)         0         ['bn_Conv1[0][0]']        

In [13]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [14]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

In [15]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



In [16]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [17]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [18]:
print(PATH_Result)

/home/jovyan/work/fruits-360_dataset/fruits-360/Results


In [21]:
#Spark PCA
def array_to_vector(array):
    return Vectors.dense(array)

to_vector_udf = udf(array_to_vector, VectorUDT())

features_vector_df = features_df.withColumn("features_vector", to_vector_udf("features"))

In [49]:
features_vector_df.select('features_vector').show(25)

+--------------------+
|     features_vector|
+--------------------+
|[0.61855274438858...|
|[0.76240664720535...|
|[0.43452119827270...|
|[0.58045774698257...|
|[1.48074543476104...|
|[0.38210460543632...|
|[0.70593112707138...|
|[0.77286916971206...|
|[0.58776515722274...|
|[0.54296451807022...|
|[0.64172405004501...|
|[0.81505554914474...|
|[0.68259793519973...|
|[0.75976765155792...|
|[0.01354158204048...|
|[1.50734794139862...|
|[1.36691784858703...|
|[0.74703872203826...|
|[0.99582183361053...|
|[0.62548857927322...|
|[0.0,0.0,0.0,0.0,...|
|[0.81792730093002...|
|[0.92367011308670...|
|[0.80328118801116...|
|[0.66679257154464...|
+--------------------+
only showing top 25 rows



In [51]:
#StandardScaler
scaler = StandardScaler(inputCol="features_vector", 
                        outputCol="scaled_features", 
                        withMean=True,   # centrer
                        withStd=True)    # réduire

# Étape 2 : Ajuster et transformer les données
scaler_model = scaler.fit(features_vector_df)
scaled_df = scaler_model.transform(features_vector_df)

In [55]:
#dump du standard scaler
SCALER_PATH = PATH+"/models/standard_scaler"

# Sauvegarde du modèle
scaler_model.write().overwrite().save(SCALER_PATH)


In [54]:
#choix de k
pca = PCA(k=100, inputCol="scaled_features", outputCol="pca_features")
model = pca.fit(scaled_df)

explained_variance = model.explainedVariance.toArray()
cumulative_variance = explained_variance.cumsum()
print(f"For k = {k}, cumulative explained variance: {cumulative_variance}")

For k = 30, cumulative explained variance: [0.18320215 0.3405928  0.41683611 0.47850163 0.51989908 0.5520948
 0.57704972 0.59967152 0.61744637 0.63361936 0.64833157 0.66252876
 0.67518925 0.68711865 0.69725763 0.70728299 0.71627156 0.72437867
 0.73208208 0.73888094 0.7455104  0.75173493 0.75776125 0.76336112
 0.76880857 0.77395892 0.77897611 0.78362298 0.78821337 0.79273404
 0.79714815 0.80146657 0.80577044 0.80991568 0.81400175 0.81796006
 0.82183949 0.82563577 0.82936965 0.83308896 0.83674977 0.84023667
 0.84369742 0.84712995 0.85049607 0.85379701 0.85699285 0.8601003
 0.86313463 0.86610419 0.86902664 0.87190641 0.87469282 0.87741475
 0.88009453 0.88270277 0.8852752  0.88782427 0.8903258  0.8927906
 0.89520099 0.89755814 0.89987647 0.90212289 0.90431409 0.90647603
 0.90860974 0.91069873 0.91276794 0.91481332 0.91680747 0.91877403
 0.92071812 0.92264778 0.92452033 0.92634541 0.92815727 0.92993809
 0.93168194 0.93338308 0.93505714 0.93670375 0.93831218 0.93984701
 0.94137188 0.94287778

In [57]:
pca = PCA(k=41, inputCol="scaled_features", outputCol="pca_features")  # k = nombre de composantes principales désirées

pca_model = pca.fit(scaled_df)

pca_result = pca_model.transform(scaled_df)


In [63]:
#dump PCA
PCA_PATH = PATH+"/models/pca"

# Sauvegarde du modèle
pca_model.write().overwrite().save(PCA_PATH)

In [None]:
#features_df.write.mode("overwrite").parquet(PATH_Result)

In [58]:
vector_to_array_udf = udf(lambda v: v.toArray().tolist(), ArrayType(FloatType()))

pca_result_clean = pca_result.withColumn("pca_array", vector_to_array_udf("pca_features"))
#pca_result_clean.select("path", "label", "pca_array").show(truncate=False)

In [59]:
pca_result_clean.select("path", "label", "pca_array").write.mode("overwrite").parquet(PATH_Result_PCA)

In [60]:
df = pd.read_parquet(PATH_Result_PCA, engine='pyarrow')

In [61]:
df.head()

Unnamed: 0,path,label,pca_array
0,file:/home/jovyan/work/fruits-360_dataset/frui...,Apple Braeburn,"[-14.683198, 4.102032, 15.725445, -7.6482406, ..."
1,file:/home/jovyan/work/fruits-360_dataset/frui...,Apple Braeburn,"[-7.78049, -7.3234935, 13.995015, -11.8683605,..."
2,file:/home/jovyan/work/fruits-360_dataset/frui...,Apple Braeburn,"[-21.829105, 9.647164, -19.66954, -17.515055, ..."
3,file:/home/jovyan/work/fruits-360_dataset/frui...,Apple Braeburn,"[14.975967, 1.138271, -1.437785, 1.6589977, 3...."
4,file:/home/jovyan/work/fruits-360_dataset/frui...,Apple Braeburn,"[-12.944075, 12.881019, -4.830613, 16.8954, 8...."


In [62]:
df.loc[0,'pca_array'].shape

(41,)

In [33]:
df_result = pd.read_parquet(PATH_Result, engine='pyarrow')

In [34]:
df_result.head()

Unnamed: 0,path,label,features
0,file:/home/jovyan/work/fruits-360_dataset/frui...,Apple Braeburn,"[0.6685475, 0.113827415, 0.0, 0.0, 0.0, 1.0088..."
1,file:/home/jovyan/work/fruits-360_dataset/frui...,Apple Braeburn,"[0.63903767, 0.036415994, 0.0, 0.0, 0.0, 0.865..."
2,file:/home/jovyan/work/fruits-360_dataset/frui...,Apple Braeburn,"[1.049216, 0.15010181, 0.0, 0.0, 0.0, 0.784024..."
3,file:/home/jovyan/work/fruits-360_dataset/frui...,Apple Braeburn,"[0.58045775, 0.21252142, 0.0, 0.0, 0.04099558,..."
4,file:/home/jovyan/work/fruits-360_dataset/frui...,Apple Braeburn,"[1.5858144, 0.174534, 0.0, 0.0, 0.37722304, 0...."


In [35]:
df_result.loc[0,'features'].shape

(1280,)