### 4.10.1 Démarrage de la session Spark

In [None]:
# L'exécution de cette cellule démarre l'application Spark

<u>Affichage des informations sur la session en cours et liens vers Spark UI</u> :

In [None]:
%%info

### 4.10.2 Installation des packages

Les packages nécessaires ont été installé via l'étape de **bootstrap** à l'instanciation du serveur.

### 4.10.3 Import des librairies

In [None]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from keras.utils import img_to_array
from keras.models import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import udf
from pyspark import SparkConf, SparkContext

### 4.10.4 Définition des PATH pour charger les images et enregistrer les résultats

Nous accédons directement à nos **données sur S3** comme si elles étaient **stockées localement**.

In [None]:
# Initialisation des chemins
PATH = 's3://p8-lucile-data/'
PATH_Data = PATH + '/Test'
PATH_Result = PATH + '/Results'

### 4.10.5 Traitement des données

#### 4.10.5.1 Chargement des données

In [None]:
images = spark.read.format('binaryFile') \
  .option('pathGlobFilter', '*.jpg') \
  .option('recursiveFileLookup', 'true') \
  .load(PATH_Data)

In [None]:
images.show(5)

<u>Je ne conserve que le **path** de l'image et j'ajoute <br />
    une colonne contenant les **labels** de chaque image</u> :

In [None]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

#### 4.10.5.2 Préparation du modèle

In [None]:
model = MobileNetV2(
    weights = 'imagenet',
    include_top = True,
    input_shape = (224, 224, 3)
    )

In [None]:
new_model = Model(
    inputs = model.input,
    outputs = model.layers[-2].output
    )

In [None]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [None]:
new_model.summary()

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """

    model = MobileNetV2(
        weights = 'imagenet',
        include_top = True,
        input_shape = (224, 224, 3)
        )

    for layer in model.layers:
        layer.trainable = False

    new_model = Model(
        inputs = model.input,
        outputs = model.layers[-2].output
        )
    new_model.set_weights(brodcast_weights.value)

    return new_model

#### 4.10.5.3 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

#### 4.10.5.4 Exécutions des actions d'extractions de features

In [None]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [None]:
features_df = images.repartition(24).select(
    col('path'),
    col('label'),
    featurize_udf('content').alias('features')
    )

In [None]:
# Conversion des caractéristiques en vecteurs
to_vector_udf = udf(lambda features: Vectors.dense(features), VectorUDT())
features_vector_df = features_df.withColumn('features_vector', to_vector_udf('features'))

In [None]:
# Détermination du k optimal
variance_df = []
for k in range(1, 101):
    pca = PCA(k=k, inputCol='features_vector', outputCol='pca_features')
    model = pca.fit(features_vector_df)
    explained_variance = model.explainedVariance.sum()
    variance_df.append((k, explained_variance))

In [None]:
# Convertir en DataFrame pour trouver le k optimal
variance_spark_df = spark.createDataFrame(variance_df, ['k', 'explained_variance'])
optimal_k = variance_spark_df.orderBy(variance_spark_df.explained_variance.desc()).first()[0]

print(f'Optimal number of components: {optimal_k}')

In [None]:
# Appliquer l'ACP avec le k optimal
pca = PCA(k=optimal_k, inputCol='features_vector', outputCol='pca_features')
pca_model = pca.fit(features_vector_df)
pca_result_df = pca_model.transform(features_vector_df)

In [None]:
# Sélection des colonnes nécessaires
result_df = pca_result_df.select('path', 'label', 'pca_features')

In [None]:
# Enregistrement des résultats réduits
result_df.write.mode('overwrite').parquet(PATH_Result)

### 4.10.6 Chargement des données enregistrées et validation du résultat

In [None]:
df = pd.read_parquet(PATH_Result, engine = 'pyarrow')

In [None]:
df.head()

In [None]:
df.loc[0,'features'].shape

In [None]:
df.shape