In [3]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

In [4]:
#spark = (SparkSession
#             .builder
#             .appName('P8')
#             .master('local') # à changer, voir les options possibles
#             .config("spark.sql.parquet.writeLegacyFormat", 'true')
#             .getOrCreate()
#)

In [16]:
#sc = spark.sparkContext

In [11]:
PATH = 's3://ericbossutp8fruits'
#PATH = './data'
PATH_Data = PATH+'/Test1'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        ./data
PATH_Data:   ./data/Test1
PATH_Result: ./data/Results


In [12]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

In [13]:
images.show(5)

+--------------------+--------------------+------+--------------------+
|                path|    modificationTime|length|             content|
+--------------------+--------------------+------+--------------------+
|file:/C:/Users/Er...|2023-08-05 17:27:...|  7301|[FF D8 FF E0 00 1...|
|file:/C:/Users/Er...|2023-08-05 17:27:...|  7059|[FF D8 FF E0 00 1...|
|file:/C:/Users/Er...|2023-08-05 17:27:...|  7020|[FF D8 FF E0 00 1...|
|file:/C:/Users/Er...|2023-08-05 17:27:...|  6697|[FF D8 FF E0 00 1...|
|file:/C:/Users/Er...|2023-08-05 17:27:...|  6617|[FF D8 FF E0 00 1...|
+--------------------+--------------------+------+--------------------+
only showing top 5 rows



In [14]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+----------------------------------------------------------------------------+--------------+
|path                                                                        |label         |
+----------------------------------------------------------------------------+--------------+
|file:/C:/Users/Eric/P8_Mode_opératoire/data/Test1/Watermelon/r_79_100.jpg   |Watermelon    |
|file:/C:/Users/Eric/P8_Mode_opératoire/data/Test1/Pineapple Mini/314_100.jpg|Pineapple Mini|
|file:/C:/Users/Eric/P8_Mode_opératoire/data/Test1/Pineapple Mini/307_100.jpg|Pineapple Mini|
|file:/C:/Users/Eric/P8_Mode_opératoire/data/Test1/Raspberry/213_100.jpg     |Raspberry     |
|file:/C:/Users/Eric/P8_Mode_opératoire/data/Test1/Raspberry/87_100.jpg      |Raspberry     |
+-------------------------------------

In [17]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

brodcast_weights = sc.broadcast(new_model.get_weights())

In [18]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



In [111]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [112]:
from pyspark.ml.functions import array_to_vector
features_df = features_df.repartition(24).select(col("path"),
                                                 col("label"),
                                                 array_to_vector('features').alias('features_vect')
                                                )

In [113]:
from pyspark.ml.feature import PCA
pca = PCA(k=131, inputCol = 'features_vect', outputCol = 'feature_after')
model = pca.fit(features_df)


In [114]:
features_df = model.transform(features_df)
features_df = features_df.repartition(24).select(col("path"),
                                                 col("label"),
                                                 col('feature_after')
                                                )
features_df.show()

+--------------------+------------------+--------------------+
|                path|             label|       feature_after|
+--------------------+------------------+--------------------+
|file:/C:/Users/Er...|     Grape White 2|[8.93691537966879...|
|file:/C:/Users/Er...|          Rambutan|[0.94450518650013...|
|file:/C:/Users/Er...|        Banana Red|[-2.8921651253487...|
|file:/C:/Users/Er...|Apple Granny Smith|[-0.0203008172038...|
|file:/C:/Users/Er...|     Grape White 2|[8.24978534437250...|
|file:/C:/Users/Er...|    Cherry Wax Red|[19.8004808999485...|
|file:/C:/Users/Er...|     Tomato Yellow|[5.58895754054543...|
|file:/C:/Users/Er...|     Grape White 4|[7.74074968465153...|
|file:/C:/Users/Er...|           Peach 2|[6.20181304549740...|
|file:/C:/Users/Er...|          Tomato 1|[11.4574803276338...|
|file:/C:/Users/Er...|         Blueberry|[9.67646039602263...|
|file:/C:/Users/Er...|        Grape Pink|[8.61654656906765...|
|file:/C:/Users/Er...|       Huckleberry|[8.44628093878

In [120]:
from pyspark.ml.functions import vector_to_array

features_df = features_df.repartition(24).select(col("path"),
                                                 col("label"),
                                                 vector_to_array('feature_after').alias('features_PCA')
                                                )

In [122]:
features_df.write.mode("overwrite").parquet(PATH_Result)

In [123]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')
df.head()

Unnamed: 0,path,label,features_PCA
0,file:/C:/Users/Eric/P8_Mode_opératoire/data/Te...,Grape White 2,"[8.936915379668793, -9.260633234679819, -3.346..."
1,file:/C:/Users/Eric/P8_Mode_opératoire/data/Te...,Rambutan,"[0.9445051865001396, 9.92379554517056, -5.2737..."
2,file:/C:/Users/Eric/P8_Mode_opératoire/data/Te...,Banana Red,"[-2.8921651253487357, 6.948151641980749, 4.136..."
3,file:/C:/Users/Eric/P8_Mode_opératoire/data/Te...,Apple Granny Smith,"[-0.02030081720382543, -2.9483597756321824, -3..."
4,file:/C:/Users/Eric/P8_Mode_opératoire/data/Te...,Grape White 2,"[8.249785344372508, -9.30670781430237, -3.3028..."


In [125]:
df['features_PCA'][0].shape

(131,)

In [None]:
df.to_csv(PATH_Result+"/featuring_with_PCA.csv")