# 1. Local

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [3]:
!pip install Pandas pillow tensorflow pyspark pyarrow

Collecting pyspark
  Downloading pyspark-3.5.0.tar.gz (316.9 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m316.9/316.9 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.0-py2.py3-none-any.whl size=317425344 sha256=3cbaf149b7e2f49d359ab9376d6e77612008de5023656c68f8aaa30e10ae7990
  Stored in directory: /root/.cache/pip/wheels/41/4e/10/c2cf2467f71c678cfc8a6b9ac9241e5e44a01940da8fbb17fc
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.0


## 3.4 Import des librairies

In [4]:
import pandas as pd
from PIL import Image
import numpy as np
import io


import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split
from pyspark.sql import SparkSession

## 3.5 Définition des PATH pour charger les images <br /> et enregistrer les résultats

Dans cette version locale nous partons du principe que les données <br />
sont stockées dans le même répertoire que le notebook.<br />
Nous n'utilisons qu'un extrait de **300 images** à traiter dans cette <br />
première version en local.<br />
L'extrait des images à charger est stockée dans le dossier **Test1**.<br />
Nous enregistrerons le résultat de notre traitement <br />
dans le dossier "**Results_Local**"

In [5]:
!pip install opendatasets
import opendatasets as od

Collecting opendatasets
  Downloading opendatasets-0.1.22-py3-none-any.whl (15 kB)
Installing collected packages: opendatasets
Successfully installed opendatasets-0.1.22


In [6]:
od.download(
    "https://www.kaggle.com/datasets/moltean/fruits/data")

Please provide your Kaggle credentials to download this dataset. Learn more: http://bit.ly/kaggle-creds
Your Kaggle username: mariabouchehboun
Your Kaggle Key: ··········
Downloading fruits.zip to ./fruits


100%|██████████| 1.28G/1.28G [00:11<00:00, 117MB/s]





In [7]:
PATH_Data = '/content/fruits/fruits-360_dataset/fruits-360/Training/**'
PATH_Result = '/content/drive/MyDrive/OC: Data Science/Livrables Soutenance/projet 8/compressed_img'

## 3.6 Création de la SparkSession


In [8]:
spark = (SparkSession
             .builder
             .appName('P8')
             .master('local')
             .config("spark.sql.parquet.writeLegacyFormat", 'true')
             .getOrCreate()
)

<u>Nous créons également la variable "**sc**" qui est un **SparkContext** issue de la variable **spark**</u> :

In [9]:
sc = spark.sparkContext

<u>Affichage des informations de Spark en cours d'execution</u> :

In [10]:
spark

## 3.7 Traitement des données






### 3.7.1 Chargement des données


In [11]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

In [12]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

root
 |-- path: string (nullable = true)
 |-- modificationTime: timestamp (nullable = true)
 |-- length: long (nullable = true)
 |-- content: binary (nullable = true)
 |-- label: string (nullable = true)

None
+--------------------------------------------------------------------------------------+--------------+
|path                                                                                  |label         |
+--------------------------------------------------------------------------------------+--------------+
|file:/content/fruits/fruits-360_dataset/fruits-360/Training/Raspberry/176_100.jpg     |Raspberry     |
|file:/content/fruits/fruits-360_dataset/fruits-360/Training/Raspberry/179_100.jpg     |Raspberry     |
|file:/content/fruits/fruits-360_dataset/fruits-360/Training/Pineapple Mini/170_100.jpg|Pineapple Mini|
|file:/content/fruits/fruits-360_dataset/fruits-360/Training/Raspberry/157_100.jpg     |Raspberry     |
|file:/content/fruits/fruits-360_dataset/fruits-360/Training/R

### 3.7.2 Préparation du modèle

J

In [13]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/mobilenet_v2/mobilenet_v2_weights_tf_dim_ordering_tf_kernels_1.0_224.h5


In [14]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

Affichage du résumé de notre nouveau modèle où nous constatons <br />
que <u>nous récupérons bien en sortie un vecteur de dimension (1, 1, 1280)</u> :

In [15]:
new_model.summary()

Model: "model"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_1 (InputLayer)        [(None, 224, 224, 3)]        0         []                            
                                                                                                  
 Conv1 (Conv2D)              (None, 112, 112, 32)         864       ['input_1[0][0]']             
                                                                                                  
 bn_Conv1 (BatchNormalizati  (None, 112, 112, 32)         128       ['Conv1[0][0]']               
 on)                                                                                              
                                                                                                  
 Conv1_relu (ReLU)           (None, 112, 112, 32)         0         ['bn_Conv1[0][0]']        

Tous les workeurs doivent pouvoir accéder au modèle ainsi qu'à ses poids. <br />
Une bonne pratique consiste à charger le modèle sur le driver puis à diffuser <br />
ensuite les poids aux différents workeurs.

In [16]:
brodcast_weights = sc.broadcast(new_model.get_weights())

<u>Mettons cela sous forme de fonction</u> :

In [17]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

### 3.7.3 Définition du processus de chargement des images et application <br/>de leur featurisation à travers l'utilisation de pandas UDF

Ce notebook définit la logique par étapes, jusqu'à Pandas UDF.

<u>L'empilement des appels est la suivante</u> :

- Pandas UDF
  - featuriser une série d'images pd.Series
   - prétraiter une image

In [18]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)



### 3.7.4 Exécution des actions d'extraction de features

Les Pandas UDF, sur de grands enregistrements (par exemple, de très grandes images), <br />
peuvent rencontrer des erreurs de type Out Of Memory (OOM).<br />
Si vous rencontrez de telles erreurs dans la cellule ci-dessous, <br />
essayez de réduire la taille du lot Arrow via 'maxRecordsPerBatch'

Je n'utiliserai pas cette commande dans ce projet <br />
et je laisse donc la commande en commentaire.

In [19]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

Nous pouvons maintenant exécuter la featurisation sur l'ensemble de notre DataFrame Spark.<br />
<u>REMARQUE</u> : Cela peut prendre beaucoup de temps, tout dépend du volume de données à traiter. <br />

Notre jeu de données de **Test** contient **22819 images**. <br />
Cependant, dans l'exécution en mode **local**, <br />
nous <u>traiterons un ensemble réduit de **330 images**</u>.

In [20]:
features_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)


In [22]:
def resize_img(img_data):
    # Improved resize function
    img = Image.open(io.BytesIO(img_data)).resize((224, 224), resample=Image.Resampling.BICUBIC)
    # i use simpler image to array conversion
    arr = np.array(img)
    return arr

def pca_img(image_arr):
    # this applies for each image
    # loading sklearn PCA set to n features
    pca = PCA(100)
    # apply resize funciton and gather all that in a stack 224*224
    resized_imgs = np.stack(image_arr.map(resize_img))
    # flatten stack for PCA
    imgs_flat = [image.flatten() for image in resized_imgs]
    # apply PCA and obtain new array with features
    pca_features = pca.fit_transform(imgs_flat)
    # remake an image from the PCA reduction
    pca_images = pca.inverse_transform(pca_features)
    # re-flatten to include as 1d array on tables
    pca_imgs_flat = [image.flatten() for image in pca_images]
    return pd.Series(pca_imgs_flat)

@pandas_udf('array<int>', PandasUDFType.SCALAR_ITER)
def pca_udf(img_series):
    for image in img_series:
        yield pca_img(image)

In [23]:
resized_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            pca_udf("content").alias("compressed_img")
                                            )

<u>Rappel du PATH où seront inscrits les fichiers au format "**parquet**" <br />
contenant nos résultats, à savoir, un DataFrame contenant 3 colonnes</u> :
 1. Path des images
 2. Label de l'image
 3. Vecteur de caractéristiques de l'image

In [24]:
print(PATH_Result)

/content/drive/MyDrive/OC: Data Science/Livrables Soutenance/projet 8/compressed_img


<u>Enregistrement des données traitées au format "**parquet**"</u> :

In [25]:
resized_df.write.mode("overwrite").parquet(PATH_Result)

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/lib/python3.10/dist-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/usr/local/lib/python3.10/dist-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.10/socket.py", line 705, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: ignored

## 3.8 Chargement des données enregistrées et validation du résultat

<u>On charge les données fraichement enregistrées dans un **DataFrame Pandas**</u> :

In [None]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

<u>On affiche les 5 premières lignes du DataFrame</u> :

In [None]:
df.head()

<u>On valide que la dimension du vecteur de caractéristiques des images est bien de dimension 1280</u> :

In [None]:
df.loc[0,'features'].shape

In [None]:
df.to_csv('/content/drive/MyDrive/OC: Data Science/Livrables Soutenance/projet 8/fichier.csv')

# 2. Cloud

In [None]:
import pandas as pd
import numpy as np
import io
import os
import tensorflow as tf
from PIL import Image
from tensorflow.keras.applications.mobilenet_v2 import MobileNetV2, preprocess_input
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras import Model
from pyspark.sql.functions import col, pandas_udf, PandasUDFType, element_at, split

In [None]:
PATH = 's3://fruit-project'
PATH_Data = PATH+'/Test'
PATH_Result = PATH
print('PATH:        '+\
      PATH+'\nPATH_Data:   '+\
      PATH_Data+'\nPATH_Result: '+PATH_Result)

In [None]:
images = spark.read.format("binaryFile") \
  .option("pathGlobFilter", "*.jpg") \
  .option("recursiveFileLookup", "true") \
  .load(PATH_Data)

In [None]:
images.show(5)

In [None]:
images = images.withColumn('label', element_at(split(images['path'], '/'),-2))
print(images.printSchema())
print(images.select('path','label').show(5,False))

In [None]:
model = MobileNetV2(weights='imagenet',
                    include_top=True,
                    input_shape=(224, 224, 3))

In [None]:
new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)

In [None]:
brodcast_weights = sc.broadcast(new_model.get_weights())

In [None]:
new_model.summary()

In [None]:
def model_fn():
    """
    Returns a MobileNetV2 model with top layer removed
    and broadcasted pretrained weights.
    """
    model = MobileNetV2(weights='imagenet',
                        include_top=True,
                        input_shape=(224, 224, 3))
    for layer in model.layers:
        layer.trainable = False
    new_model = Model(inputs=model.input,
                  outputs=model.layers[-2].output)
    new_model.set_weights(brodcast_weights.value)
    return new_model

In [None]:
def preprocess(content):
    """
    Preprocesses raw image bytes for prediction.
    """
    img = Image.open(io.BytesIO(content)).resize([224, 224])
    arr = img_to_array(img)
    return preprocess_input(arr)

def featurize_series(model, content_series):
    """
    Featurize a pd.Series of raw images using the input model.
    :return: a pd.Series of image features
    """
    input = np.stack(content_series.map(preprocess))
    preds = model.predict(input)
    # For some layers, output features will be multi-dimensional tensors.
    # We flatten the feature tensors to vectors for easier storage in Spark DataFrames.
    output = [p.flatten() for p in preds]
    return pd.Series(output)

@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def featurize_udf(content_series_iter):
    '''
    This method is a Scalar Iterator pandas UDF wrapping our featurization function.
    The decorator specifies that this returns a Spark DataFrame column of type ArrayType(FloatType).

    :param content_series_iter: This argument is an iterator over batches of data, where each batch
                              is a pandas Series of image data.
    '''
    # With Scalar Iterator pandas UDFs, we can load the model once and then re-use it
    # for multiple data batches.  This amortizes the overhead of loading big models.
    model = model_fn()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

In [None]:
# spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [None]:
features_df = images.repartition(24).select(col("path"),
                                            col("label"),
                                            featurize_udf("content").alias("features")
                                           )

In [None]:
def resize_img(img_data):
    # Improved resize function
    img = Image.open(io.BytesIO(img_data)).resize((224, 224), resample=Image.Resampling.BICUBIC)
    # i use simpler image to array conversion
    arr = np.array(img)
    return arr

def pca_img(image_arr):
    # this applies for each image
    # loading sklearn PCA set to n features
    pca = PCA(100)
    # apply resize funciton and gather all that in a stack 224*224
    resized_imgs = np.stack(image_arr.map(resize_img))
    # flatten stack for PCA
    imgs_flat = [image.flatten() for image in resized_imgs]
    # apply PCA and obtain new array with features
    pca_features = pca.fit_transform(imgs_flat)
    # remake an image from the PCA reduction
    pca_images = pca.inverse_transform(pca_features)
    # re-flatten to include as 1d array on tables
    pca_imgs_flat = [image.flatten() for image in pca_images]
    return pd.Series(pca_imgs_flat)

@pandas_udf('array<int>', PandasUDFType.SCALAR_ITER)
def pca_udf(img_series):
    for image in img_series:
        yield pca_img(image)

In [None]:
resized_df = images.repartition(20).select(col("path"),
                                            col("label"),
                                            pca_udf("content").alias("compressed_img")
                                            )

In [None]:
print(PATH_Result)

In [None]:
features_df.write.mode("overwrite").parquet(PATH_Result)

In [None]:
df = pd.read_parquet(PATH_Result, engine='pyarrow')

In [None]:
df.head()

In [None]:
df.loc[0,'features'].shape

In [None]:
df.shape

In [None]:
#on arrete la session
spark.stop()