## Imports et création de notre session Spark

In [1]:
# Nos imports
import pyspark
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import StandardScaler
from pyspark.ml.feature import PCA
from pyspark.ml.linalg import Vectors
from pyspark.ml.linalg import VectorUDT
from pyspark.sql import Row
from pyspark.sql.functions import split
from pyspark.sql.functions import col
from pyspark.sql.functions import udf
from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType
from pyspark.sql.types import IntegerType, StringType, ArrayType

import boto3

import pandas as pd
import numpy as np
from PIL import Image

import tensorflow as tf
from tensorflow.keras.applications.inception_v3 import InceptionV3
from tensorflow.keras.applications.inception_v3 import preprocess_input

VBox()

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
1,application_1660313079227_0003,pyspark,idle,Link,Link,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [2]:
# On crée notre environnement Spark
spark = SparkSession.builder.appName('P8_OCR_VLE').getOrCreate()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Fonctions

In [3]:
def load_img(img_path):
    """
    Prend le chemin dans un bucket S3 d'un objet (une image), le lit et
    le transforme sous la forme d'une liste.

            Parameters:
                img_path : le chemin de l'image dans notre bucket S3
    """
    
    # On récupère nos images en se connectant à notre bucket S3
    s3 = boto3.resource('s3', region_name='eu-west-3')
    bucket = s3.Bucket('p8ocrvle')
    
    # On récupère les objets dans notre bucket
    img = img_path.replace('s3://p8ocrvle/', '')
    bucket_file = bucket.Object(img)
    s3_response = bucket_file.get()
    file_stream = s3_response['Body']
    image = Image.open(file_stream)
    
    # On enlève les images qui ne retournent rien
    if image is None:
        image = 0
        
    else:
        image = np.asarray(image)
        image = np.resize(image, (299, 299, 3))
        image = image.flatten().tolist()
    
    return image

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
def model_pretrained():
    """
    Retourne notre modèle (InceptionV3), sans la couche la plus haute
    (couche de classification) et on y ajoute manuellement les poids pré-entraînés.
    """
    
    model = InceptionV3(weights=None, include_top=False)
    model.set_weights(model_weights.value)
    return model

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
def preprocess_img(data):
    """
    Prend une liste représentative de nos images, les transforme
    en array et change le format de l'image. Retourne les images
    auxquelles on applique le preprocessing propre à notre modèle.
    """
    
    image = np.asarray(data)
    image = np.resize(image, (299, 299, 3))

    return preprocess_input(image)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
def featurize_series(model, content_series):
    """
    Récupère toutes nos images pour y appliquer notre preprocessing
    et on applique notre modèle, avant de faire une classification.
    Retourne une série composée de nos features.
    
            Parameters:
                model : notre modèle (InceptionV3 ici)
                content_series : nos données images
    """
    
    model_input = np.stack(content_series.map(preprocess_img)) 
    preds = model.predict(model_input)
    model_output = [x.flatten() for x in preds]
    return pd.Series(model_output)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
@pandas_udf('array<float>', PandasUDFType.SCALAR_ITER)
def udf_featurize_series(content_series_iter):
    """
    UDF (User Defined Function) wrapper pour notre fonction de featurisation.
    Retourne une colonne de DataFrame Spark de type ArrayType(FloatType)
    
            Parameters:
            content_series_iter : série de nos données images
    """
    model = model_pretrained()
    for content_series in content_series_iter:
        yield featurize_series(model, content_series)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…



## Préparation des données

In [8]:
# On se connecte à notre bucket S3
session = boto3.session.Session(aws_access_key_id='XXX',
                                aws_secret_access_key='XXX')

s3_client = session.client(service_name='s3', region_name='eu-west-3')

# Et on se prépare à lire nos fichiers
prefix = 'Data'
sub_folders = s3_client.list_objects_v2(Bucket='p8ocrvle', Prefix=prefix)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
# On lit les fichiers présent dans notre dossiers "Data"
# On récupère l'emplacement des fichiers
lst_path = []
for key in sub_folders['Contents']:
    file = key['Key']
    file = file.replace(prefix + '/', '')
    lst_path.append('s3://p8ocrvle/Data/' + file)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
# On crée notre DataFrame et on y met l'emplacement de fichiers
rdd = spark.sparkContext.parallelize(lst_path)
row_rdd = rdd.map(lambda x: Row(x))
df_pyspark = spark.createDataFrame(row_rdd, ['path'])

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [11]:
# On vérifie
df_pyspark.show(5, truncate=False)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------------------------------------+
|path                                           |
+-----------------------------------------------+
|s3://p8ocrvle/Data/apple_6/r0_4.jpg            |
|s3://p8ocrvle/Data/apple_6/r0_6.jpg            |
|s3://p8ocrvle/Data/apple_pink_lady_1/r0_32.jpg |
|s3://p8ocrvle/Data/apple_pink_lady_1/r0_34.jpg |
|s3://p8ocrvle/Data/apple_red_yellow_1/r0_32.jpg|
+-----------------------------------------------+
only showing top 5 rows

In [12]:
# On récupère nos labels
df_pyspark = df_pyspark.withColumn('label', split(col('path'), '/').getItem(4))

# On ajoute une colonne avec nos données images
udf_image = udf(load_img, ArrayType(IntegerType()))
df_pyspark = df_pyspark.withColumn('data', udf_image('path'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
# On vérifie de nouveau
df_pyspark.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+--------------------+
|                path|             label|                data|
+--------------------+------------------+--------------------+
|s3://p8ocrvle/Dat...|           apple_6|[255, 255, 255, 2...|
|s3://p8ocrvle/Dat...|           apple_6|[255, 255, 255, 2...|
|s3://p8ocrvle/Dat...| apple_pink_lady_1|[255, 255, 255, 2...|
|s3://p8ocrvle/Dat...| apple_pink_lady_1|[255, 255, 255, 2...|
|s3://p8ocrvle/Dat...|apple_red_yellow_1|[255, 255, 255, 2...|
+--------------------+------------------+--------------------+
only showing top 5 rows

In [14]:
# On va utiliser un StringIndexer pour encoder nos labels
stringIndexer = StringIndexer(inputCol='label', outputCol='label_encoded')
sI = stringIndexer.fit(df_pyspark)

# On encode et on convertit nos labels en Integer (lisibilité)
image_df = sI.transform(df_pyspark)
image_df = image_df.withColumn('label_encoded', col('label_encoded').cast(IntegerType()))

# On réorganise nos colonnes (lisibilité)
image_df = image_df.select('path', 'label', 'label_encoded', 'data')
image_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+-------------+--------------------+
|                path|             label|label_encoded|                data|
+--------------------+------------------+-------------+--------------------+
|s3://p8ocrvle/Dat...|           apple_6|            0|[255, 255, 255, 2...|
|s3://p8ocrvle/Dat...|           apple_6|            0|[255, 255, 255, 2...|
|s3://p8ocrvle/Dat...| apple_pink_lady_1|            1|[255, 255, 255, 2...|
|s3://p8ocrvle/Dat...| apple_pink_lady_1|            1|[255, 255, 255, 2...|
|s3://p8ocrvle/Dat...|apple_red_yellow_1|            2|[255, 255, 255, 2...|
+--------------------+------------------+-------------+--------------------+
only showing top 5 rows

## Preprocessing de nos données

In [15]:
# On charge notre model et les poids associés
model = InceptionV3(include_top=False)
model_weights = spark.sparkContext.broadcast(model.get_weights()) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [16]:
# On applique notre preprocessing
features_df = image_df.select(col('path'), col('label'), col('label_encoded'),
                              udf_featurize_series('data').alias('features'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [17]:
# On vérifie
features_df.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+------------------+-------------+--------------------+
|                path|             label|label_encoded|            features|
+--------------------+------------------+-------------+--------------------+
|s3://p8ocrvle/Dat...|           apple_6|            0|[0.0, 0.0, 0.0, 0...|
|s3://p8ocrvle/Dat...|           apple_6|            0|[0.0, 0.06371247,...|
|s3://p8ocrvle/Dat...| apple_pink_lady_1|            1|[0.0, 0.0, 0.0, 0...|
|s3://p8ocrvle/Dat...| apple_pink_lady_1|            1|[0.0, 0.0, 0.0, 0...|
|s3://p8ocrvle/Dat...|apple_red_yellow_1|            2|[0.059556235, 0.0...|
+--------------------+------------------+-------------+--------------------+
only showing top 5 rows

## Réduction de dimensions

Avant d'appliquer une PCA, nous devons centrer et réduire nos données, nous allons donc appliquer le StandardScaler de PySpark.

In [18]:
# On transforme nos données en vecteurs
list_to_vector_udf = udf(lambda l: Vectors.dense(l), VectorUDT())
features_df = features_df.select(col('path'),  col('label'),
                                 list_to_vector_udf(features_df['features']).alias('features_vect'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# On vérifie
features_df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features_vect: vector (nullable = true)

In [20]:
# On standardise nos données
standardizer = StandardScaler(withMean=True, withStd=True,
                              inputCol='features_vect',
                              outputCol='feats_scaled')
std = standardizer.fit(features_df)
features_df_scaled = std.transform(features_df)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
# On vérifie
features_df_scaled.printSchema()
features_df_scaled.show(5) 

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- path: string (nullable = true)
 |-- label: string (nullable = true)
 |-- features_vect: vector (nullable = true)
 |-- feats_scaled: vector (nullable = true)

+--------------------+------------------+--------------------+--------------------+
|                path|             label|       features_vect|        feats_scaled|
+--------------------+------------------+--------------------+--------------------+
|s3://p8ocrvle/Dat...|           apple_6|[0.0,0.0,0.0,0.0,...|[-0.4063785368685...|
|s3://p8ocrvle/Dat...|           apple_6|[0.0,0.0637124702...|[-0.4063785368685...|
|s3://p8ocrvle/Dat...| apple_pink_lady_1|[0.0,0.0,0.0,0.0,...|[-0.4063785368685...|
|s3://p8ocrvle/Dat...| apple_pink_lady_1|[0.0,0.0,0.0,0.0,...|[-0.4063785368685...|
|s3://p8ocrvle/Dat...|apple_red_yellow_1|[0.05955623462796...|[-0.0431709601578...|
+--------------------+------------------+--------------------+--------------------+
only showing top 5 rows

Nous pouvons maintenant appliquer une PCA.

In [22]:
pca = PCA(k=8, inputCol='feats_scaled', outputCol='pca')
modelpca = pca.fit(features_df_scaled)
transformed = modelpca.transform(features_df_scaled)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [23]:
# On regarde la variance expliquée par nos nouvelles données
variance_explained = modelpca.explainedVariance
variance_explained

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DenseVector([0.2144, 0.1765, 0.1511, 0.1084, 0.0975, 0.0899, 0.0842, 0.0781])

## Enregistrement des résultats

In [24]:
# On enregistre dans un nouveau dossier dans notre bucket S3
features_df_scaled.write.parquet(path='s3://p8ocrvle/Features/', mode='overwrite')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…