## 1.1 Démarrage de la session Spark

In [None]:
# L'exécution de cette cellule démarre l'application Spark

## 1.2 Import des librairies

In [None]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

### 1.2.1 Définitions des path

In [None]:
PATH = 's3://monbucketprojet8'
PATH_Data = PATH+'/Test1'
PATH_Result = PATH+'/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

## 2 Traitement des données

### 2.1 Chargement des images

In [None]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

In [None]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(6,False))

### 2.2 Préparation du modèle

### 2.2.1 Chargement du modèle **MobileNetV2**

In [None]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

### 2.2.2 Supression de la dernière couche 

In [None]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

Brodcast des poids

In [None]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [None]:
new_model.summary()

On met les étapes dans une fonction

In [None]:
def model_fn():
    """
    On charge un model MobileNet sans poids et on lui
    applique les poids recuperés à l'étape précedente.
    """
    model = MobileNetV2(weights=None)
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

### 2.2.3 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

### 2.2.3.1 Réduction de dimension PCA


In [None]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

Transformation des features en **vecteur**

In [None]:
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
#from pyspark.ml.linalg import VectorUDT

# assuming your DataFrame is named 'df'
# assuming your 'features' column is a list of numerical values
to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
df = features_df.withColumn("features", to_vector_udf("features"))

Application d'un **StandardScaler**

In [None]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, PCA

scaler = StandardScaler(
    inputCol = 'features', 
    outputCol = 'scaledFeatures',
    withMean = True,
    withStd = True
).fit(df)

# when we transform the dataframe, the old
# feature will still remain in it
df_scaled = scaler.transform(df)
df_scaled.show(6)

Application de la **PCA**

In [None]:
n_components = 3
pca = PCA(
    k = n_components, 
    inputCol = 'scaledFeatures', 
    outputCol = 'pcaFeatures'
).fit(df_scaled)

In [None]:
df_pca = pca.transform(df_scaled).select(['path','label','pcaFeatures'])
print('Explained Variance Ratio', pca.explainedVariance.toArray())
df_pca.show(6)

Transforamtion inverse des features réduites en **Matrice** 

In [None]:
from pyspark.ml.functions import vector_to_array

df_l = df_pca.withColumn("pcaFeaturesArray", vector_to_array("pcaFeatures")).select(['path','label','pcaFeaturesArray'])

In [None]:
print(PATH_Result)

Sauvegarde des résultats au format **parquet**

In [None]:
# df_l.write.mode("overwrite").parquet(PATH_Result)

In [None]:
df_l.write.mode("overwrite").parquet(PATH_Result)

### 2.2.3.2 Chargement des données enregistrées et validation du résultat

In [None]:
# df = spark.read_parquet(PATH_Result, engine='pyarrow')

In [None]:
%pip install fastparquet

In [None]:
df = pd.read_parquet(PATH_Result, engine='fastparquet')

In [None]:
df.head()

On vérifie qu'on a bien 3 features 

In [None]:
df.loc[0,'pcaFeaturesArray'].shape