# Installation des packages

In [0]:
!pip install Pandas pillow tensorflow pyspark pyarrow

Collecting tensorflow
  Downloading tensorflow-2.11.0-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (588.3 MB)
[?25l[K     |                                | 10 kB 23.8 MB/s eta 0:00:25[K     |                                | 20 kB 29.1 MB/s eta 0:00:21[K     |                                | 30 kB 35.2 MB/s eta 0:00:17[K     |                                | 40 kB 24.6 MB/s eta 0:00:24[K     |                                | 51 kB 19.4 MB/s eta 0:00:31[K     |                                | 61 kB 22.0 MB/s eta 0:00:27[K     |                                | 71 kB 23.8 MB/s eta 0:00:25[K     |                                | 81 kB 22.6 MB/s eta 0:00:27[K     |                                | 92 kB 24.4 MB/s eta 0:00:25[K     |                                | 102 kB 24.0 MB/s eta 0:00:25[K     |                                | 112 kB 24.0 MB/s eta 0:00:25[K     |                                | 122 kB 24.0 MB/s eta 0:00:25[K     |     

# Import des librairies

In [0]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

In [0]:
PATH = '/FileStore/tables'
PATH_Data = PATH+'/test'
PATH_Result = PATH+'/test/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

PATH:        /FileStore/tables
PATH_Data:   /FileStore/tables/test
PATH_Result: /FileStore/tables/test/Results


In [0]:
from pyspark.sql import SparkSession
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)

In [0]:
sc = spark.sparkContext

In [0]:
spark

In [0]:
PATH = '/FileStore/tables/test/'
 
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH)

In [0]:
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+--------------------------------------------------+-----------+
|path                                              |label      |
+--------------------------------------------------+-----------+
|dbfs:/FileStore/tables/test/Apple Red 3/45_100.jpg|Apple Red 3|
|dbfs:/FileStore/tables/test/Apple Red 3/4_100.jpg |Apple Red 3|
|dbfs:/FileStore/tables/test/Apple Red 3/44_100.jpg|Apple Red 3|
|dbfs:/FileStore/tables/test/Apple Red 3/46_100.jpg|Apple Red 3|
|dbfs:/FileStore/tables/test/Apple Red 3/9_100.jpg |Apple Red 3|
+--------------------------------------------------+-----------+
only showing top 5 rows

None


# Préparation du modèle

In [0]:
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5


In [0]:
from tensorflow.keras import Model
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

In [0]:
new_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 224, 224, 3  0           []                               
                                )]                                                                
                                                                                                  
 Conv1 (Conv2D)                 (None, 112, 112, 32  864         ['input_1[0][0]']                
                                )                                                                 
                                                                                                  
 bn_Conv1 (BatchNormalization)  (None, 112, 112, 32  128         ['Conv1[0][0]']                  
                                )                                                             

In [0]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [0]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

In [0]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



# Extraction de features

In [0]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [0]:
print(PATH_Result)

/FileStore/tables/test/Results


In [0]:
features_df.write.mode("overwrite").parquet(PATH_Result)

In [0]:
df = spark.read.parquet(PATH_Result)

In [0]:
df.show()

+--------------------+------------------+--------------------+
|                path|             label|            features|
+--------------------+------------------+--------------------+
|dbfs:/FileStore/t...|       Apple Red 1|[1.372402, 0.0, 0...|
|dbfs:/FileStore/t...|Apple Crimson Snow|[0.29964697, 0.0,...|
|dbfs:/FileStore/t...|    Apple Braeburn|[0.7345138, 0.014...|
|dbfs:/FileStore/t...|    Apple Braeburn|[1.0135366, 0.170...|
|dbfs:/FileStore/t...|   Apple Pink Lady|[0.62396353, 0.17...|
|dbfs:/FileStore/t...|   Apple Pink Lady|[0.21275485, 0.09...|
|dbfs:/FileStore/t...|   Apple Pink Lady|[0.57049716, 0.04...|
|dbfs:/FileStore/t...|   Apple Pink Lady|[0.5124905, 0.042...|
|dbfs:/FileStore/t...|   Apple Pink Lady|[0.8905549, 0.074...|
|dbfs:/FileStore/t...|   Apple Pink Lady|[0.6859673, 0.059...|
|dbfs:/FileStore/t...|   Apple Pink Lady|[0.31911242, 0.02...|
|dbfs:/FileStore/t...|    Apple Braeburn|[0.78315455, 0.05...|
|dbfs:/FileStore/t...|       Apple Red 1|[1.5206081, 0.

# Réduction de dimension

## Transformer des tableaux en vecteurs pour effectuer une réduction

In [0]:
from pyspark.ml.linalg import Vectors, VectorUDT
array_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

In [0]:
vectorized_df = df.withColumn('cnn_vectors', array_to_vector_udf('features'))

In [0]:
vectorized_df.show(5, True)

+--------------------+------------------+--------------------+--------------------+
|                path|             label|            features|         cnn_vectors|
+--------------------+------------------+--------------------+--------------------+
|dbfs:/FileStore/t...|       Apple Red 1|[1.372402, 0.0, 0...|[1.37240195274353...|
|dbfs:/FileStore/t...|Apple Crimson Snow|[0.29964697, 0.0,...|[0.29964697360992...|
|dbfs:/FileStore/t...|    Apple Braeburn|[0.7345138, 0.014...|[0.73451381921768...|
|dbfs:/FileStore/t...|    Apple Braeburn|[1.0135366, 0.170...|[1.01353657245635...|
|dbfs:/FileStore/t...|   Apple Pink Lady|[0.62396353, 0.17...|[0.62396353483200...|
+--------------------+------------------+--------------------+--------------------+
only showing top 5 rows



## Initialiser et appliquer PCA

In [0]:
import time
from pyspark.ml.feature import PCA
start = time.perf_counter()
pca = PCA(k=3, inputCol='cnn_vectors', outputCol='pca_vectors')
model = pca.fit(vectorized_df)
stop = time.perf_counter()
print(f'pca - fit best k nb, elapsed time: {stop - start:0.2f}s')

pca - fit best k nb, elapsed time: 14.47s


In [0]:
# apply pca reduction
start = time.perf_counter()
reduced_df = model.transform(vectorized_df)
stop = time.perf_counter()
print(f'pca - application, elapsed time: {stop - start:0.2f}s')

pca - application, elapsed time: 0.43s


In [0]:
reduced_df.show(5, True)

+--------------------+------------------+--------------------+--------------------+--------------------+
|                path|             label|            features|         cnn_vectors|         pca_vectors|
+--------------------+------------------+--------------------+--------------------+--------------------+
|dbfs:/FileStore/t...|       Apple Red 1|[1.372402, 0.0, 0...|[1.37240195274353...|[-9.9779935044115...|
|dbfs:/FileStore/t...|Apple Crimson Snow|[0.29964697, 0.0,...|[0.29964697360992...|[-4.6278018289284...|
|dbfs:/FileStore/t...|    Apple Braeburn|[0.7345138, 0.014...|[0.73451381921768...|[-7.6928211864106...|
|dbfs:/FileStore/t...|    Apple Braeburn|[1.0135366, 0.170...|[1.01353657245635...|[-8.1791749824692...|
|dbfs:/FileStore/t...|   Apple Pink Lady|[0.62396353, 0.17...|[0.62396353483200...|[-7.7172630299062...|
+--------------------+------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



## Transformation inverse : des vecteurs au tableau

In [0]:
vector_to_array_udf = udf(lambda v: v.toArray().tolist())

In [0]:
final_df = reduced_df.withColumn('features', vector_to_array_udf('pca_vectors'))

In [0]:
final_df.show(5, True)

+--------------------+------------------+--------------------+--------------------+--------------------+
|                path|             label|            features|         cnn_vectors|         pca_vectors|
+--------------------+------------------+--------------------+--------------------+--------------------+
|dbfs:/FileStore/t...|       Apple Red 1|[-9.9779935044115...|[1.37240195274353...|[-9.9779935044115...|
|dbfs:/FileStore/t...|Apple Crimson Snow|[-4.6278018289284...|[0.29964697360992...|[-4.6278018289284...|
|dbfs:/FileStore/t...|    Apple Braeburn|[-7.6928211864106...|[0.73451381921768...|[-7.6928211864106...|
|dbfs:/FileStore/t...|    Apple Braeburn|[-8.1791749824692...|[1.01353657245635...|[-8.1791749824692...|
|dbfs:/FileStore/t...|   Apple Pink Lady|[-7.7172630299062...|[0.62396353483200...|[-7.7172630299062...|
+--------------------+------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



# Stockage des résultats

In [0]:
PATH_feat = PATH+'Resultsfinal'

In [0]:
print(PATH_feat)

/FileStore/tables/test/Resultsfinal


In [0]:
# write local results on parquet file
start = time.perf_counter()
final_df.write.mode('overwrite').parquet(PATH_feat)
stop = time.perf_counter()
print(f'write local, elapsed time: {stop - start:0.2f}s')

write local, elapsed time: 1.88s


In [0]:
# read local results from parquet file
start = time.perf_counter()
pd_final_df = spark.read.parquet('/FileStore/tables/featurized_sample')
stop = time.perf_counter()
print(f'read local, elapsed time: {stop - start:0.2f}s')

read local, elapsed time: 0.39s


In [0]:
pd_final_df.show()

+--------------------+------------------+--------------------+--------------------+--------------------+
|                path|             label|            features|         cnn_vectors|         pca_vectors|
+--------------------+------------------+--------------------+--------------------+--------------------+
|dbfs:/FileStore/t...|       Apple Red 1|[-9.9779935044115...|[1.37240195274353...|[-9.9779935044115...|
|dbfs:/FileStore/t...|Apple Crimson Snow|[-4.6278018289284...|[0.29964697360992...|[-4.6278018289284...|
|dbfs:/FileStore/t...|    Apple Braeburn|[-7.6928211864106...|[0.73451381921768...|[-7.6928211864106...|
|dbfs:/FileStore/t...|    Apple Braeburn|[-8.1791749824692...|[1.01353657245635...|[-8.1791749824692...|
|dbfs:/FileStore/t...|   Apple Pink Lady|[-7.7172630299062...|[0.62396353483200...|[-7.7172630299062...|
|dbfs:/FileStore/t...|   Apple Pink Lady|[-0.2328548307550...|[0.21275484561920...|[-0.2328548307550...|
|dbfs:/FileStore/t...|   Apple Pink Lady|[3.02284692449

In [0]:
pd_final_df=pd_final_df.toPandas()

  Unable to convert the field cnn_vectors. If this column is not necessary, you may consider dropping it or converting to primitive type before the conversion.
Direct cause: Unsupported type in conversion to Arrow: VectorUDT()
Attempting non-optimization as 'spark.sql.execution.arrow.pyspark.fallback.enabled' is set to true.
  warn(msg)


In [0]:
pd_final_df.info(verbose=False, memory_usage="deep")

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 191 entries, 0 to 190
Columns: 5 entries, path to pca_vectors
dtypes: object(5)
memory usage: 71.3 KB


In [0]:
pd_final_df.head(20)

Unnamed: 0,path,label,features,cnn_vectors,pca_vectors
0,dbfs:/FileStore/tables/test/Apple Red 1/5_100.jpg,Apple Red 1,"[-9.977993504411515, -0.034702230601073275, 0....","[1.3724019527435303, 0.0, 0.0, 0.0, 0.29786100...","[-9.977993504411515, -0.034702230601073275, 0...."
1,dbfs:/FileStore/tables/test/Apple Crimson Snow...,Apple Crimson Snow,"[-4.6278018289284, -0.06404771849065835, 0.838...","[0.2996469736099243, 0.0, 0.0, 0.0, 0.0, 0.001...","[-4.6278018289284, -0.06404771849065835, 0.838..."
2,dbfs:/FileStore/tables/test/Apple Braeburn/9_1...,Apple Braeburn,"[-7.692821186410652, 0.3525782574296277, 0.762...","[0.7345138192176819, 0.014667997136712074, 0.0...","[-7.692821186410652, 0.3525782574296277, 0.762..."
3,dbfs:/FileStore/tables/test/Apple Braeburn/33_...,Apple Braeburn,"[-8.179174982469236, 0.18441106170675578, 0.65...","[1.0135365724563599, 0.1703416109085083, 0.0, ...","[-8.179174982469236, 0.18441106170675578, 0.65..."
4,dbfs:/FileStore/tables/test/Apple Pink Lady/79...,Apple Pink Lady,"[-7.717263029906234, 0.3264440938157566, 0.526...","[0.6239635348320007, 0.17311465740203857, 0.0,...","[-7.717263029906234, 0.3264440938157566, 0.526..."
5,dbfs:/FileStore/tables/test/Apple Pink Lady/r_...,Apple Pink Lady,"[-0.23285483075502672, 0.1032065296573911, 0.9...","[0.21275484561920166, 0.09000739455223083, 0.0...","[-0.23285483075502672, 0.1032065296573911, 0.9..."
6,dbfs:/FileStore/tables/test/Apple Pink Lady/r_...,Apple Pink Lady,"[3.022846924498873, 0.24327182145965393, 0.646...","[0.5704971551895142, 0.0498691201210022, 0.010...","[3.022846924498873, 0.24327182145965393, 0.646..."
7,dbfs:/FileStore/tables/test/Apple Pink Lady/r_...,Apple Pink Lady,"[2.924266833611801, 0.7354113542578532, 0.9273...","[0.5124905109405518, 0.04227013513445854, 0.0,...","[2.924266833611801, 0.7354113542578532, 0.9273..."
8,dbfs:/FileStore/tables/test/Apple Pink Lady/22...,Apple Pink Lady,"[-8.100881025859787, 0.25038529582707836, 0.79...","[0.8905549049377441, 0.0740971490740776, 0.0, ...","[-8.100881025859787, 0.25038529582707836, 0.79..."
9,dbfs:/FileStore/tables/test/Apple Pink Lady/24...,Apple Pink Lady,"[-7.472347010417352, 0.06347121839574706, 0.90...","[0.6859673261642456, 0.059126920998096466, 0.0...","[-7.472347010417352, 0.06347121839574706, 0.90..."
