In [None]:
#pip install tensorflow

In [None]:
# pip install azure-storage-blob

## 1. Import

### 1.1 Import libraries

In [None]:
import pandas as pd
import numpy as np
import io
import os
from PIL import Image
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient

### 1.2 Import données

In [None]:
dbutils.fs.mount(
  source = "blob_container",
  mount_point = "/mnt/p11-mount",
  extra_configs = {"account-key"}
)

In [None]:
PATH = "/mnt/p11-mount/data"
PATH_Data = PATH+'/Test'
PATH_Result = PATH+'/Results'

print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

## 2. Traitement des données
### 2.1 Chargement des données

In [None]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)


### 2.2 Ajout d'une colonne 'label' à notre DataFrame

Nous ajoutons une nouvelle colonne à notre DataFrame 'images'. Cette colonne, appelée 'label', est créée en extrayant le nom du dossier contenant chaque image à partir du chemin de l'image.

Voici comment cela fonctionne :

1. Nous divisons le chemin de chaque image en plusieurs parties en utilisant le caractère '/' comme séparateur.
2. Nous prenons l'avant-dernière partie, qui correspond au nom du dossier contenant l'image.
3. Nous ajoutons cette information comme une nouvelle colonne dans notre DataFrame.

Après avoir ajouté la colonne 'label', nous vérifions son ajout en affichant le schéma du DataFrame et les cinq premières lignes des colonnes 'path' et 'label'.

In [None]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

## 3. Modelisation

### 3.1 Préparation du modèle



In [None]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

In [None]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

### 3.2 Diffusion des poids du modèle

Nous utilisons la fonction `broadcast` de Spark pour diffuser les poids du modèle à tous les nœuds du cluster. Cela permet d'assurer que tous les nœuds ont une copie des poids du modèle, ce qui est nécessaire pour effectuer des prédictions ou des mises à jour de modèle sur les données distribuées.

In [None]:
brodcast_weights = spark.broadcast(new_model.get_weights())

### 3.3 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF
<u>La séquence d'appels est organisée comme suit</u> :

- Pandas UDF
  - Appliquer la featurisation à une série d'images pd.Series
   - Effectuer le prétraitement d'une image

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed 
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

### 3.4 Définition du processus de chargement des images <br/> et application de leur featurisation à travers l'utilisation de pandas UDF

<u>La séquence d'appels est organisée comme suit</u> :

- Pandas UDF
  - Appliquer la featurisation à une série d'images pd.Series
   - Effectuer le prétraitement d'une image

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

### 3.5 Exécutions des actions d'extractions de features

In [None]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [None]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)

## 4 PCA

Nous allons maintenant effectuer une **Analyse en Composantes Principales (PCA)** sur notre ensemble de données. 

#### Préparation des données

Pour commencer, nous définissons une **fonction utilisateur (UDF)** qui convertit un tableau en un vecteur. C'est une étape nécessaire car la fonction PCA de PySpark attend des données sous forme de vecteurs.

Ensuite, nous chargeons notre ensemble de données à partir d'un fichier **Parquet**. Parquet est un format de fichier optimisé pour le stockage en colonnes, ce qui est idéal pour les grands ensembles de données.

Une fois les données chargées, nous convertissons une colonne de l'ensemble de données, qui est un tableau de nombres à virgule flottante, en un vecteur à l'aide de l'UDF que nous avons définie.

#### Application de la PCA

Maintenant, nous sommes prêts à appliquer la PCA à la colonne de vecteurs. Nous spécifions que nous voulons deux composantes principales avec le paramètre `k`.

Après avoir appliqué la PCA, nous supprimons la colonne de vecteurs originale de l'ensemble de données. Cela nous permet d'économiser de l'espace, car cette colonne n'est plus nécessaire.

#### Conversion en DataFrame Pandas

Enfin, nous convertissons l'ensemble de données Spark en un **DataFrame Pandas**. Cela nous permet d'utiliser facilement les données avec des bibliothèques Python qui s'attendent à des DataFrames Pandas.

In [None]:
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import udf
from pyspark.ml.linalg import VectorUDT

# Define a UDF to convert array to vector
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())

# Load the data
df = spark.read.parquet(PATH_Result)

# Convert the array of floats to a vector
df = df.withColumn("features_vec", list_to_vector_udf(df["features"]))

# Apply PCA
pca = PCA(k=2, inputCol="features_vec", outputCol="pcaFeatures")
model = pca.fit(df)
result = model.transform(df)
result = result.drop('features_vec')
result = result.drop('features')

# Convert the Spark DataFrame to a Pandas DataFrame
result_pd = result.toPandas()

## 5. Affichage Resultats et sauvegarde

In [None]:
result_pd.head()

In [None]:
# Save the DataFrame to a CSV file
result_pd.to_csv(os.path.join(PATH_Result, 'azure_result.csv'), index=False)