# Préparation

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
!pip install Pandas pillow tensorflow pyspark pyarrow boto3

In [None]:
import pandas as pd
from PIL import Image
import numpy as np
import io
import os

import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

# SparkSession

## Path

In [None]:
PATH = os.getcwd()
PATH_Data = PATH+'/drive/MyDrive/Test/'
PATH_Result = PATH+'/drive/MyDrive/Results'
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

## Spark

In [None]:

# Créer une nouvelle SparkSession
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local[4]')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)

print("Nouvelle SparkSession créée avec succès")

sc = spark.sparkContext

spark

# Traitement des données

In [None]:
# Chargement 
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

## MobileNetV2

In [None]:
brodcast_weights = sc.broadcast(new_model.get_weights())

def model_fn():
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

In [None]:
def preprocess(content):

    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):

    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

In [None]:
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [None]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [None]:
print(PATH_Result)

In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)

## Réduction PCA

In [None]:
from pyspark.ml.feature import PCA, VectorAssembler
from pyspark.ml.linalg import DenseVector, VectorUDT
from pyspark.sql.functions import udf

# Chargement des caractéristiques extraites (features_df déjà créé en 3.7.4)
features_df = spark.read.parquet(PATH_Result)

# UDF pour convertir array<float> en DenseVector
array_to_vector_udf = udf(lambda array: DenseVector(array), VectorUDT())

# Convertir la colonne "features" en vecteur
features_df = features_df.withColumn("features", array_to_vector_udf(features_df["features"]))

# Application de la réduction de dimension avec PCA
pca = PCA(k=50, inputCol="features", outputCol="pca_features")
pca_model = pca.fit(features_df)
features_df_pca = pca_model.transform(features_df)

# Validation de la réduction (calcul de la variance expliquée)
explained_variance = pca_model.explainedVariance
print("Variance expliquée par le PCA :", explained_variance)

# Sauvegarde des données réduites
output_path_reduced = PATH + "/Results/reduced_features.parquet"
features_df_pca.select("path", "label", "pca_features").write.mode("overwrite").parquet(output_path_reduced)

print(f"Données réduites enregistrées à {output_path_reduced}")


# Chargement

In [None]:
df = pd.read_parquet(output_path_reduced, engine='pyarrow')
df.head()
df.loc[0,'pca_features'].shape