In [None]:
import os
import findspark

# Set env variable
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages com.amazonaws:aws-java-sdk-pom" \
                                    ":1.12.220,org.apache.hadoop:hadoop-aws:3.2.1 " \
                                    "pyspark-shell"
print("PYSPARK_SUBMIT_ARGS: " + str(os.environ['PYSPARK_SUBMIT_ARGS']) + "/n")

# Locate Spark
findspark.init('/home/ubuntu/spark-3.3.0-bin-hadoop3')
print("Locate Spark: " + str(findspark.find()))

In [None]:
# Pyspark
import pyspark
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import udf

# ML Pyspark
from pyspark.ml.feature import StandardScaler, PCA
from pyspark.ml.linalg import DenseVector, VectorUDT

import boto3 as boto
import io
from io import BytesIO, StringIO

# General librairies
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

# Librairies to deal with images
from PIL import Image
from tensorflow.keras.preprocessing.image import img_to_array
from tensorflow.keras.applications.resnet50 import ResNet50, preprocess_input

import warnings
warnings.filterwarnings('ignore')

## Sommaire <a class="anchor" id="Sommaire"></a>

* [Définition des variables globales et configurations](#Partie1)    
* [Récupération des images depuis S3](#Partie2)
* [Feature engineering à l'aide d'un CNN](#Partie3)
* [Analyse en Composantes Principales (ACP)](#Partie4)
* [Enregistrement sur le cloud](#Partie5)

## Définition des variables globales et configurations <a class="anchor" id="Partie1"></a>

In [None]:
# Set connection Variables
ACCESS_KEY = "AKIAQ6ET7AAESVDSBNPZ"
SECRET_KEY = "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"
BUCKET_NAME = "donnees-projet8"
PATH_IN_THE_BUCKET = "Images"
REGION_NAME = "eu-west-1"
END_POINT = "eu-west-1.amazonaws.com"

In [None]:
# Spark session
spark = (SparkSession.builder
         .appName("Notebook P8")
         .config("spark.driver.extraJavaOptions", 
                 "--add-opens=java.base/sun.nio.ch=ALL-UNNAMED " \
                 "--add-opens=java.base/java.lang=ALL-UNNAMED  " \
                 "--add-opens=java.base/java.util=ALL-UNNAMED")
         .config('spark.hadoop.fs.s3a.access.key', ACCESS_KEY)
         .config('spark.hadoop.fs.s3a.secret.key', SECRET_KEY)
         .config('spark.hadoop.fs.s3a.region', REGION_NAME)
         .config('spark.hadoop.fs.s3a.endpoint', END_POINT)
         .config("spark.hadoop.fs.s3a.impl", 'org.apache.hadoop.fs.s3a.S3AFileSystem')
         .config("spark.hadoop.fs.s3a.impl", 'org.apache.hadoop.fs.s3a.NativeS3FileSystem')
         .config("com.amazonaws.services.s3.enableV4", "true")
         .config("fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.BasicAWSCredentialsProvider")
    
         # Amount of memory to be used for the driver process
         .config("spark.driver.memory","16g")
         # Amount of memory to be used for the executor process
         .config("spark.executor.memory","12g")
         # Number of cores to be used for the executor process
         .config("spark.executor.cores","4")
         .getOrCreate())
    

# Spark context
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", END_POINT)
sc.setSystemProperty("com.amazonaws.services.s3.enableV4", "true")

In [None]:
spark

[Retour au sommaire](#Sommaire)
### Récupération des images depuis S3 <a class="anchor" id="Partie2"></a>

In [None]:
# Accès aux données S3
s3 = boto.resource("s3", 
                   aws_access_key_id = ACCESS_KEY, 
                   aws_secret_access_key = SECRET_KEY, 
                   region_name = REGION_NAME)
mon_bucket = s3.Bucket(BUCKET_NAME)
liste_images = mon_bucket.objects.filter(Prefix = PATH_IN_THE_BUCKET).all()

# Lecture de toutes les images du bucket et récupération de leur catégorie
data = []
for obj in liste_images:
    file_stream = obj.get()['Body']
    img = file_stream.read()
    file_stream.close()
    
    label = obj.key.split("/")[-2]
    data.append([label, img])

# Définition des colonnes du dataframe
schema = StructType([
    StructField("Catégorie", StringType(), True),
    StructField("Image", BinaryType(), True)])

# Création du dataframe
spark_df = spark.createDataFrame(data, schema)

print("Nombre d'images récupérées :", str(spark_df.count()), "\n")
spark_df.printSchema()
spark_df.show()

In [None]:
# Affichage d'une image
img = spark_df.select('Image').collect()
first_img = img[0][0]
Image.open(io.BytesIO(first_img))

[Retour au sommaire](#Sommaire)
## Feature engineering à l'aide d'un CNN <a class="anchor" id="Partie3"></a>

In [None]:
model = ResNet50(include_top=False,
                 weights="imagenet",
                 pooling="avg")

# verify that the top layer is removed
model.summary()

In [None]:
@udf(returnType=VectorUDT())
def Vectorizer(content):
    # Définition du CNN
    model = ResNet50(include_top=False,
                     weights="imagenet",
                     pooling="avg")
    
    # Ouverture de l'image et conversion en array
    img = Image.open(io.BytesIO(content))
    arr_img = img_to_array(img)
    
    # Preprocessing de l'image
    preprocessed_arr_img = preprocess_input(arr_img)

    # Redimensionnement de l'image
    reshaped_arr_img = preprocessed_arr_img.reshape(
        (1,
         preprocessed_arr_img.shape[0],
         preprocessed_arr_img.shape[1],
         preprocessed_arr_img.shape[2]))
    
    # Passage de l'image dans le CNN
    X_features = model.predict(reshaped_arr_img)
    
    # Récupération des features
    features = X_features.flatten().tolist()
    return DenseVector(features)

In [None]:
spark_df = spark_df.withColumn("CNN_Features", Vectorizer(spark_df['Image']))

spark_df.printSchema()
spark_df.show()

In [None]:
features_CNN = spark_df.select('CNN_Features').collect()
len(features_CNN[0][0])

En sortie du CNN, chaque image est représentée par 2048 features. Pour éviter d'éventuels problèmes de mémoire lors de la classification, nous réalisons une ACP afin de réduire la dimension.

[Retour au sommaire](#Sommaire)
## Analyse en Composantes Principales (ACP) <a class="anchor" id="Partie4"></a>

In [None]:
# Adjust the size of the Arrow record batches
spark.conf.set("spark.sql.execution.arrow.maxRecordsPerBatch", "1024")

In [None]:
# Standard Scaler
scaler = StandardScaler(inputCol="CNN_Features",
                        outputCol="ScaledFeatures",
                        withStd=True,
                        withMean=True)
model_scaler = scaler.fit(spark_df)
spark_df = model_scaler.transform(spark_df)

In [None]:
# Nous avons 22 images donc nous allons essayer de réduire le nombres de dimensions à moins de 22
n_comp = 15
pca = PCA(k=n_comp, inputCol="ScaledFeatures", outputCol="PCA_Features")
model_pca = pca.fit(spark_df)
model_pca.explainedVariance.cumsum()

En sélectionnant 15 composantes, nous expliquons 90% de la variance.

In [None]:
spark_df = model_pca.transform(spark_df)

In [None]:
# Fonction pour afficher l'éboulis des valeurs propres
def display_scree_plot(pca):
    scree = pca.explained_variance_ratio_*100
    plt.bar(np.arange(len(scree))+1, scree)
    plt.plot(np.arange(len(scree))+1, scree.cumsum(), c="red", marker='o')
    plt.xlabel("Rang de l'axe d'inertie")
    plt.ylabel("Pourcentage d'inertie")
    plt.title("Eboulis des valeurs propres")
    plt.show(block=False)

display_scree_plot(model_pca)

In [None]:
spark_df.printSchema()

In [None]:
spark_df.show()

[Retour au sommaire](#Sommaire)
## Enregistrement sur le cloud <a class="anchor" id="Partie5"></a>

In [None]:
final_pandas_df = spark_df.toPandas()
final_pandas_df.head()

In [None]:
csv_buffer = io.StringIO()
final_pandas_df.to_csv(csv_buffer, index=False)

s3.Object(BUCKET_NAME, "Sorties/Final_df.csv").put(Body=csv_buffer.getvalue())